Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-8325 Emit DDL events #215

Merged
merged 3 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -7,26 +7,81 @@

import java.util.Properties;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.config.CommonConnectorConfig;
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.relational.TableId;
import io.debezium.schema.AbstractTopicNamingStrategy;
import io.debezium.util.Collect;
import io.debezium.util.Strings;

/**
* <p>
* Topic naming strategy where only the table name is added. This is used to avoid including
* the shard which is now part of the catalog of the table ID and would be included if
* the DefaultTopicNamingStrategy is being used.
*</p>
* Additionally, supports some Vitess-specific configs:
* <ul>
* <li>
* overrideDataChangeTopicPrefix: in the case of mulitple connectors for the same keyspace,
* a unique `topic.prefix` is required for proper metric reporting, so in order for a consistent topic
* naming convention, the data change topic prefix can be set here (typically shared between connectors of the same
* keyspace
* </li>
* <li>
* overrideSchemaChangeTopic: in the case of multiple connectors for the same keyspace and `include.schema.changes` being enabled,
* this is used to prevent the ddl events from being written to the topic named `topic.prefix`, which is the default behavior.
* The reason why this is necessary is that the `topic.prefix` may include the table name for uniqueness, so it may actually be the name
* of a data change topic.
* </li>
* </ul>
*/
public class TableTopicNamingStrategy extends AbstractTopicNamingStrategy<TableId> {

private final String overrideDataChangeTopicPrefix;
private static final Logger LOGGER = LoggerFactory.getLogger(TableTopicNamingStrategy.class);

public static final Field OVERRIDE_DATA_CHANGE_TOPIC_PREFIX = Field.create("override.data.change.topic.prefix")
.withDisplayName("Override Data Topic prefix")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withValidation(CommonConnectorConfig::validateTopicName)
.withDescription("Overrides the topic.prefix used for the data change topic.");

public static final Field OVERRIDE_SCHEMA_CHANGE_TOPIC = Field.create("override.schema.change.topic")
.withDisplayName("Override schema change topic name")
.withType(ConfigDef.Type.STRING)
.withWidth(ConfigDef.Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withValidation(CommonConnectorConfig::validateTopicName)
.withDescription("Overrides the name of the schema change topic (if not set uses topic.prefx).");

private String overrideDataChangeTopicPrefix;
private String overrideSchemaChangeTopic;

public TableTopicNamingStrategy(Properties props) {
super(props);
}

@Override
public void configure(Properties props) {
super.configure(props);
Configuration config = Configuration.from(props);
this.overrideDataChangeTopicPrefix = config.getString(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX);
final Field.Set configFields = Field.setOf(
OVERRIDE_DATA_CHANGE_TOPIC_PREFIX,
OVERRIDE_SCHEMA_CHANGE_TOPIC);
if (!config.validateAndRecord(configFields, LOGGER::error)) {
throw new ConnectException("Unable to validate config.");
}

overrideDataChangeTopicPrefix = config.getString(OVERRIDE_DATA_CHANGE_TOPIC_PREFIX);
overrideSchemaChangeTopic = config.getString(OVERRIDE_SCHEMA_CHANGE_TOPIC);
}

public static TableTopicNamingStrategy create(CommonConnectorConfig config) {
Expand All @@ -44,4 +99,22 @@ public String dataChangeTopic(TableId id) {
}
return topicNames.computeIfAbsent(id, t -> sanitizedTopicName(topicName));
}

/**
* Return the schema change topic. There are two cases:
* 1. If override schema change topic is specified - use this as the topic name
* 2. If override schema change topic is not specified - call the super method to get the typical
* schema change topic name.
*
* @return String representing the schema change topic name.
*/
@Override
public String schemaChangeTopic() {
if (!Strings.isNullOrBlank(overrideSchemaChangeTopic)) {
return overrideSchemaChangeTopic;
}
else {
return super.schemaChangeTopic();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,20 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
+ "'false' (the default) omits the fields; "
+ "'true' converts the field into an implementation dependent binary representation.");

// Needed for backward compatibility, otherwise all upgraded connectors will start publishing schema change events
// by default.
public static final Field INCLUDE_SCHEMA_CHANGES = Field.create("include.schema.changes")
.withDisplayName("Include database schema changes")
.withType(Type.BOOLEAN)
.withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 0))
.withWidth(Width.SHORT)
.withImportance(ConfigDef.Importance.MEDIUM)
.withDescription("Whether the connector should publish changes in the database schema to a Kafka topic with "
+ "the same name as the database server ID. Each schema change will be recorded using a key that "
+ "contains the database name and whose value include logical description of the new schema and optionally the DDL statement(s). "
+ "The default is 'true'. This is independent of how the connector internally records database schema history.")
.withDefault(false);

public static final Field OFFSET_STORAGE_PER_TASK = Field.create(VITESS_CONFIG_GROUP_PREFIX + "offset.storage.per.task")
.withDisplayName("Store offsets per task")
.withType(Type.BOOLEAN)
Expand All @@ -360,14 +374,6 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue
+ "'false' (the default) offsets are stored as a single unit under the database name. "
+ "'true' stores the offsets per task id");

public static final Field OVERRIDE_DATA_CHANGE_TOPIC_PREFIX = Field.create("override.data.change.topic.prefix")
.withDisplayName("Override Data Topic prefix")
.withType(Type.STRING)
.withWidth(Width.MEDIUM)
.withImportance(ConfigDef.Importance.LOW)
.withValidation(CommonConnectorConfig::validateTopicName)
.withDescription("Overrides the topic.prefix used for the data change topic.");

public static final Field OFFSET_STORAGE_TASK_KEY_GEN = Field.create(VITESS_CONFIG_GROUP_PREFIX + "offset.storage.task.key.gen")
.withDisplayName("Offset storage task key generation number")
.withType(Type.INT)
Expand Down Expand Up @@ -478,7 +484,6 @@ private static int validateTimePrecisionMode(Configuration config, Field field,
GRPC_HEADERS,
GRPC_MAX_INBOUND_MESSAGE_SIZE,
BINARY_HANDLING_MODE,
OVERRIDE_DATA_CHANGE_TOPIC_PREFIX,
SCHEMA_NAME_ADJUSTMENT_MODE,
OFFSET_STORAGE_PER_TASK,
OFFSET_STORAGE_TASK_KEY_GEN,
Expand Down Expand Up @@ -553,6 +558,11 @@ public String getConnectorName() {
return Module.name();
}

@Override
public boolean isSchemaChangesHistoryEnabled() {
return getConfig().getBoolean(INCLUDE_SCHEMA_CHANGES);
}

@Override
public TemporalPrecisionMode getTemporalPrecisionMode() {
return TemporalPrecisionMode.parse(getConfig().getString(TIME_PRECISION_MODE));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.DebeziumException;
import io.debezium.connector.vitess.connection.DdlMetadataExtractor;
import io.debezium.connector.vitess.connection.ReplicationConnection;
import io.debezium.connector.vitess.connection.ReplicationMessage;
import io.debezium.connector.vitess.connection.ReplicationMessageProcessor;
Expand All @@ -19,6 +21,7 @@
import io.debezium.pipeline.EventDispatcher;
import io.debezium.pipeline.source.spi.StreamingChangeEventSource;
import io.debezium.relational.TableId;
import io.debezium.schema.SchemaChangeEvent;
import io.debezium.util.Clock;
import io.debezium.util.DelayStrategy;

Expand Down Expand Up @@ -108,10 +111,36 @@ else if (message.getOperation() == ReplicationMessage.Operation.COMMIT) {
}
return;
}
else if (message.getOperation() == ReplicationMessage.Operation.DDL || message.getOperation() == ReplicationMessage.Operation.OTHER) {
// DDL event or OTHER event
else if (message.getOperation() == ReplicationMessage.Operation.OTHER) {
offsetContext.rotateVgtid(newVgtid, message.getCommitTime());
}
else if (message.getOperation() == ReplicationMessage.Operation.DDL) {
offsetContext.rotateVgtid(newVgtid, message.getCommitTime());
offsetContext.setShard(message.getShard());

DdlMetadataExtractor metadataExtractor = new DdlMetadataExtractor(message);
TableId tableId = VitessDatabaseSchema.parse(metadataExtractor.getTable());
offsetContext.event(tableId, message.getCommitTime());
String ddlStatement = message.getStatement();
SchemaChangeEvent.SchemaChangeEventType eventType = metadataExtractor.getSchemaChangeEventType();
SchemaChangeEvent schemaChangeEvent = SchemaChangeEvent.of(
eventType,
partition,
offsetContext,
connectorConfig.getKeyspace(),
null,
ddlStatement,
null,
false);
dispatcher.dispatchSchemaChangeEvent(partition, offsetContext, null, (receiver) -> {
try {
receiver.schemaChangeEvent(schemaChangeEvent);
}
catch (Exception e) {
throw new DebeziumException(e);
}
});
}
else if (message.getOperation().equals(ReplicationMessage.Operation.HEARTBEAT)) {
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@ public class DdlMessage implements ReplicationMessage {
private final String transactionId;
private final Instant commitTime;
private final Operation operation;
private final String statement;
private final String shard;
private final String keyspace;

public DdlMessage(String transactionId, Instant commitTime) {
public DdlMessage(String transactionId, Instant commitTime, String statement, String keyspace, String shard) {
this.transactionId = transactionId;
this.commitTime = commitTime;
this.operation = Operation.DDL;
this.statement = statement;
this.keyspace = keyspace;
this.shard = shard;
}

@Override
Expand All @@ -41,9 +47,19 @@ public String getTable() {
throw new UnsupportedOperationException();
}

@Override
public String getStatement() {
return statement;
}

@Override
public String getKeyspace() {
return keyspace;
}

@Override
public String getShard() {
throw new UnsupportedOperationException();
return shard;
}

@Override
Expand All @@ -67,8 +83,14 @@ public String toString() {
+ "transactionId='"
+ transactionId
+ '\''
+ ", keyspace="
+ keyspace
+ ", shard="
+ shard
+ ", commitTime="
+ commitTime
+ ", statement="
+ statement
+ ", operation="
+ operation
+ '}';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.debezium.connector.vitess.connection;

import java.util.regex.Matcher;
import java.util.regex.Pattern;

import io.debezium.connector.vitess.VitessDatabaseSchema;
import io.debezium.schema.SchemaChangeEvent;

/**
* @author Thomas Thornton
*/
public class DdlMetadataExtractor {

// VStream DDL statements do not contain any database/keyspace, only contains the table name
private static final Pattern TABLE_NAME_PATTERN = Pattern.compile(
"(?i)(CREATE|ALTER|TRUNCATE|DROP|RENAME)\\s+TABLE\\s+['\\\"`]?([\\w]+)['\\\"`]?",
Pattern.CASE_INSENSITIVE);

private final DdlMessage ddlMessage;
private String operation;
private String table;

public DdlMetadataExtractor(ReplicationMessage ddlMessage) {
this.ddlMessage = (DdlMessage) ddlMessage;
extractMetadata();
}

public void extractMetadata() {
Matcher matcher = TABLE_NAME_PATTERN.matcher(this.ddlMessage.getStatement());
if (matcher.find()) {
operation = matcher.group(1).split("\s+")[0].toUpperCase();
if (operation.equals("RENAME")) {
operation = "ALTER";
}
table = matcher.group(2);
}
}

public SchemaChangeEvent.SchemaChangeEventType getSchemaChangeEventType() {
return SchemaChangeEvent.SchemaChangeEventType.valueOf(operation);
}

public String getTable() {
return VitessDatabaseSchema.buildTableId(ddlMessage.getShard(), ddlMessage.getKeyspace(), table).toDoubleQuotedString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ public String getTransactionId() {
throw new UnsupportedOperationException();
}

@Override
public String getKeyspace() {
throw new UnsupportedOperationException();
}

@Override
public String getTable() {
throw new UnsupportedOperationException();
Expand All @@ -44,6 +49,11 @@ public String getShard() {
throw new UnsupportedOperationException();
}

@Override
public String getStatement() {
throw new UnsupportedOperationException();
}

@Override
public List<Column> getOldTupleList() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,21 @@ public String getTransactionId() {
return transactionId;
}

@Override
public String getKeyspace() {
throw new UnsupportedOperationException();
}

@Override
public String getTable() {
throw new UnsupportedOperationException();
}

@Override
public String getStatement() {
throw new UnsupportedOperationException();
}

@Override
public String getShard() {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ interface ColumnValue<T> {

String getTransactionId();

String getKeyspace();

String getTable();

String getShard();
Expand All @@ -80,6 +82,8 @@ interface ColumnValue<T> {

List<Column> getNewTupleList();

String getStatement();

default boolean isTransactionalMessage() {
return getOperation() == Operation.BEGIN || getOperation() == Operation.COMMIT;
}
Expand Down
Loading