Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DBZ-8479 Improve DDL parsing logic #217

Merged
merged 3 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,28 +118,31 @@ else if (message.getOperation() == ReplicationMessage.Operation.DDL) {
offsetContext.rotateVgtid(newVgtid, message.getCommitTime());
offsetContext.setShard(message.getShard());

DdlMetadataExtractor metadataExtractor = new DdlMetadataExtractor(message);
TableId tableId = VitessDatabaseSchema.parse(metadataExtractor.getTable());
offsetContext.event(tableId, message.getCommitTime());
String ddlStatement = message.getStatement();
SchemaChangeEvent.SchemaChangeEventType eventType = metadataExtractor.getSchemaChangeEventType();
SchemaChangeEvent schemaChangeEvent = SchemaChangeEvent.of(
eventType,
partition,
offsetContext,
connectorConfig.getKeyspace(),
null,
ddlStatement,
null,
false);
dispatcher.dispatchSchemaChangeEvent(partition, offsetContext, null, (receiver) -> {
try {
receiver.schemaChangeEvent(schemaChangeEvent);
}
catch (Exception e) {
throw new DebeziumException(e);
}
});
// DDLs events are only published if the schema change history is enabled, so we should skip parsing DDL events if it's disabled
if (connectorConfig.isSchemaChangesHistoryEnabled()) {
DdlMetadataExtractor metadataExtractor = new DdlMetadataExtractor(message);
TableId tableId = VitessDatabaseSchema.parse(metadataExtractor.getTable());
offsetContext.event(tableId, message.getCommitTime());
String ddlStatement = message.getStatement();
SchemaChangeEvent.SchemaChangeEventType eventType = metadataExtractor.getSchemaChangeEventType();
SchemaChangeEvent schemaChangeEvent = SchemaChangeEvent.of(
eventType,
partition,
offsetContext,
connectorConfig.getKeyspace(),
null,
ddlStatement,
null,
false);
dispatcher.dispatchSchemaChangeEvent(partition, offsetContext, null, (receiver) -> {
try {
receiver.schemaChangeEvent(schemaChangeEvent);
}
catch (Exception e) {
throw new DebeziumException(e);
}
});
}
}
else if (message.getOperation().equals(ReplicationMessage.Operation.HEARTBEAT)) {
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.debezium.connector.vitess.VitessDatabaseSchema;
import io.debezium.schema.SchemaChangeEvent;

Expand All @@ -17,11 +20,21 @@
*/
public class DdlMetadataExtractor {

private static final Logger LOGGER = LoggerFactory.getLogger(DdlMetadataExtractor.class);

// VStream DDL statements do not contain any database/keyspace, only contains the table name
private static final Pattern TABLE_NAME_PATTERN = Pattern.compile(
"(?i)(CREATE|ALTER|TRUNCATE|DROP|RENAME)\\s+TABLE\\s+['\\\"`]?([\\w]+)['\\\"`]?",
"(CREATE|ALTER|TRUNCATE|DROP|RENAME)\\s+TABLE\\s+([\\w`\\.]+)",
Pattern.CASE_INSENSITIVE);

// Regex to match in-line or multi-line comments (e.g., /* comment */)
private static final Pattern COMMENT_PATTERN = Pattern.compile("/\\*.*?\\*/", Pattern.DOTALL);

// Regex to match single-line comments (e.g., -- comment and # comment)
private static final Pattern SINGLE_LINE_COMMENT_PATTERN = Pattern.compile("(--|#).*?(\r?\n|$)");

private static final String UNKNOWN_TABLE_NAME = "<UNKNOWN>";

private final DdlMessage ddlMessage;
private String operation;
private String table;
Expand All @@ -32,21 +45,52 @@ public DdlMetadataExtractor(ReplicationMessage ddlMessage) {
}

public void extractMetadata() {
Matcher matcher = TABLE_NAME_PATTERN.matcher(this.ddlMessage.getStatement());
String cleanedStatement = removeComments(this.ddlMessage.getStatement());
Matcher matcher = TABLE_NAME_PATTERN.matcher(cleanedStatement);
if (matcher.find()) {
operation = matcher.group(1).split("\s+")[0].toUpperCase();
if (operation.equals("RENAME")) {
operation = "ALTER";
}
table = matcher.group(2);
String tableName = matcher.group(2);
if (tableName.contains(".")) {
String[] parts = tableName.split("\\.");
tableName = parts[1];
}
table = tableName.replaceAll("`", "");
}
}

private String removeComments(String statement) {
statement = COMMENT_PATTERN.matcher(statement).replaceAll("");
statement = SINGLE_LINE_COMMENT_PATTERN.matcher(statement).replaceAll("");
statement = statement.replaceAll("\\s+", " ").trim();
return statement;
}

public SchemaChangeEvent.SchemaChangeEventType getSchemaChangeEventType() {
if (operation == null) {
logUnknownMessage("schema change event type");
// An event type is required to build a schema change event, so if we got an empty event type, default to ALTER
return SchemaChangeEvent.SchemaChangeEventType.ALTER;
}
return SchemaChangeEvent.SchemaChangeEventType.valueOf(operation);
}

public String getTable() {
if (table == null) {
logUnknownMessage("table");
table = UNKNOWN_TABLE_NAME;
}
return VitessDatabaseSchema.buildTableId(ddlMessage.getShard(), ddlMessage.getKeyspace(), table).toDoubleQuotedString();
}

private void logUnknownMessage(String message) {
LOGGER.warn("Unknown {}, keyspace: {}, shard: {}, commit time {}, transaction ID: {}",
message,
ddlMessage.getKeyspace(),
ddlMessage.getShard(),
ddlMessage.getCommitTime(),
ddlMessage.getTransactionId());
}
}
1 change: 0 additions & 1 deletion src/test/docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ FROM vitess/lite:v19.0.4

USER root

RUN apt-key adv --no-tty --recv-keys --keyserver keyserver.ubuntu.com B7B3B788A8D3785C
RUN apt-get update
RUN apt-get install -y sudo curl vim jq

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.awaitility.Awaitility;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -1367,7 +1366,6 @@ public void shouldMultiShardConfigSubscriptionHaveMultiShardGtidsInVgtid() throw
}

@Test
@Ignore // TODO: enable the test once DBZ-8432 is fixed
public void shouldMultiShardMultiTaskConfigSubscriptionHaveMultiShardGtidsInVgtid() throws Exception {
final boolean hasMultipleShards = true;

Expand All @@ -1378,7 +1376,8 @@ public void shouldMultiShardMultiTaskConfigSubscriptionHaveMultiShardGtidsInVgti

int expectedRecordsCount = 1;
consumer = testConsumer(expectedRecordsCount);
assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TEST_SHARDED_KEYSPACE, TestHelper.PK_FIELD, hasMultipleShards);
// Since there are two tasks and each gets one shard this is expected to only have one shard
assertInsert(INSERT_NUMERIC_TYPES_STMT, schemasAndValuesForNumericTypes(), TEST_SHARDED_KEYSPACE, TestHelper.PK_FIELD, false);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,59 +11,109 @@
import org.junit.Test;

import io.debezium.connector.vitess.TestHelper;
import io.debezium.connector.vitess.VitessDatabaseSchema;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.schema.SchemaChangeEvent;

/**
* @author Thomas Thornton
*/
public class DdlMetadataExtractorTest {

private static String expectedTableName = VitessDatabaseSchema.buildTableId(
TestHelper.TEST_SHARD, TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_TABLE).toDoubleQuotedString();

@Test
public void shouldGetAlterType() {
DdlMessage ddlMessage = new DdlMessage(null, null, "ALTER TABLE foo ADD COLUMN bar",
DdlMessage ddlMessage = new DdlMessage(null, null,
String.format("ALTER TABLE %s ADD COLUMN bar", TestHelper.TEST_TABLE),
TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD);
DdlMetadataExtractor extractor = new DdlMetadataExtractor(ddlMessage);
assertThat(extractor.getSchemaChangeEventType()).isEqualTo(SchemaChangeEvent.SchemaChangeEventType.ALTER);
assertThat(extractor.getTable()).isEqualTo(expectedTableName);
}

@Test
public void shouldGetCreateType() {
DdlMessage ddlMessage = new DdlMessage(null, null, "CREATE TABLE foo",
DdlMessage ddlMessage = new DdlMessage(null, null,
String.format("CREATE TABLE %s", TestHelper.TEST_TABLE),
TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD);
DdlMetadataExtractor extractor = new DdlMetadataExtractor(ddlMessage);
assertThat(extractor.getSchemaChangeEventType()).isEqualTo(SchemaChangeEvent.SchemaChangeEventType.CREATE);
assertThat(extractor.getTable()).isEqualTo(expectedTableName);
}

@Test
public void shouldGetTruncateType() {
DdlMessage ddlMessage = new DdlMessage(null, null, "TRUNCATE TABLE foo",
DdlMessage ddlMessage = new DdlMessage(null, null,
String.format("TRUNCATE TABLE %s", TestHelper.TEST_TABLE),
TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD);
DdlMetadataExtractor extractor = new DdlMetadataExtractor(ddlMessage);
assertThat(extractor.getSchemaChangeEventType()).isEqualTo(SchemaChangeEvent.SchemaChangeEventType.TRUNCATE);
assertThat(extractor.getTable()).isEqualTo(expectedTableName);
}

@Test
public void shouldGetTable() {
DdlMessage ddlMessage = new DdlMessage(null, null, "TRUNCATE TABLE foo",
DdlMessage ddlMessage = new DdlMessage(null, null,
String.format("TRUNCATE TABLE %s", TestHelper.TEST_TABLE),
TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD);
DdlMetadataExtractor extractor = new DdlMetadataExtractor(ddlMessage);
assertThat(extractor.getTable()).isEqualTo("\"0\".\"test_unsharded_keyspace\".\"foo\"");
assertThat(extractor.getTable()).isEqualTo(expectedTableName);
}

@Test
public void shouldGetDropType() {
DdlMessage ddlMessage = new DdlMessage(null, null, "DROP TABLE foo",
DdlMessage ddlMessage = new DdlMessage(null, null,
String.format("DROP TABLE %s", TestHelper.TEST_TABLE),
TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD);
DdlMetadataExtractor extractor = new DdlMetadataExtractor(ddlMessage);
assertThat(extractor.getSchemaChangeEventType()).isEqualTo(SchemaChangeEvent.SchemaChangeEventType.DROP);
assertThat(extractor.getTable()).isEqualTo(expectedTableName);
}

@Test
public void shouldGetRenameType() {
DdlMessage ddlMessage = new DdlMessage(null, null, "RENAME TABLE foo TO bar",
DdlMessage ddlMessage = new DdlMessage(null, null,
String.format("RENAME TABLE %s TO %s_suffix", TestHelper.TEST_TABLE, TestHelper.TEST_TABLE),
TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD);
DdlMetadataExtractor extractor = new DdlMetadataExtractor(ddlMessage);
assertThat(extractor.getSchemaChangeEventType()).isEqualTo(SchemaChangeEvent.SchemaChangeEventType.ALTER);
assertThat(extractor.getTable()).isEqualTo(expectedTableName);
}

@Test
public void shouldParseStatementWithComments() {
DdlMessage ddlMessage = new DdlMessage(null, null,
String.format("rename /* gh-ost */ table `keyspace`.`%s` to `keyspace`.`_table1_del`, `keyspace`.`_table_gho` to `keyspace`.`table`",
TestHelper.TEST_TABLE),
TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD);
DdlMetadataExtractor extractor = new DdlMetadataExtractor(ddlMessage);
assertThat(extractor.getSchemaChangeEventType()).isEqualTo(SchemaChangeEvent.SchemaChangeEventType.ALTER);
assertThat(extractor.getTable()).isEqualTo(expectedTableName);
}

@Test
public void shouldParseStatementWithSingleLineComment() {
DdlMessage ddlMessage = new DdlMessage(null, null,
String.format("create \n # gh-ost \n table `keyspace`.`%s`;", TestHelper.TEST_TABLE),
TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD);
DdlMetadataExtractor extractor = new DdlMetadataExtractor(ddlMessage);
assertThat(extractor.getSchemaChangeEventType()).isEqualTo(SchemaChangeEvent.SchemaChangeEventType.CREATE);
assertThat(extractor.getTable()).isEqualTo(expectedTableName);
}

@Test
public void shouldGracefullyHandleUnparseableStatement() {
DdlMessage ddlMessage = new DdlMessage(null, null, "UNPARSEABLE command to a table",
TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD);
DdlMetadataExtractor extractor = new DdlMetadataExtractor(ddlMessage);
final LogInterceptor logInterceptor = new LogInterceptor(DdlMetadataExtractor.class);
String unknownTable = "Unknown table";
String unknownType = "Unknown schema change event type";
assertThat(extractor.getTable()).isEqualTo("\"0\".\"test_unsharded_keyspace\".\"<UNKNOWN>\"");
assertThat(extractor.getSchemaChangeEventType()).isEqualTo(SchemaChangeEvent.SchemaChangeEventType.ALTER);
assertThat(logInterceptor.containsWarnMessage(unknownTable)).isTrue();
assertThat(logInterceptor.containsWarnMessage(unknownType)).isTrue();
}
}
Loading