From 8f8c30e591f9a9f09b14f812aa4dcb124050f5d8 Mon Sep 17 00:00:00 2001 From: twthorn Date: Thu, 7 Nov 2024 16:26:55 -0500 Subject: [PATCH 1/3] DBZ-8325 Emit DDL events --- .../VitessStreamingChangeEventSource.java | 33 +++++- .../vitess/connection/DdlMessage.java | 26 ++++- .../connection/DdlMetadataExtractor.java | 52 +++++++++ .../vitess/connection/HeartbeatMessage.java | 10 ++ .../vitess/connection/OtherMessage.java | 10 ++ .../vitess/connection/ReplicationMessage.java | 4 + .../connection/TransactionalMessage.java | 14 ++- .../VStreamOutputMessageDecoder.java | 9 +- .../VStreamOutputReplicationMessage.java | 13 +++ .../VitessReplicationConnection.java | 4 +- .../debezium/connector/vitess/TestHelper.java | 3 + .../vitess/VitessBigIntUnsignedTest.java | 1 + .../vitess/VitessChangeRecordEmitterTest.java | 7 +- .../connector/vitess/VitessConnectorIT.java | 110 +++++++++++++++++- .../vitess/connection/DdlMessageTest.java | 59 ++++++++++ .../connection/DdlMetadataExtractorTest.java | 69 +++++++++++ src/test/resources/vitess_create_tables.ddl | 12 ++ src/test/resources/vitess_vschema.json | 12 ++ 18 files changed, 435 insertions(+), 13 deletions(-) create mode 100644 src/main/java/io/debezium/connector/vitess/connection/DdlMetadataExtractor.java create mode 100644 src/test/java/io/debezium/connector/vitess/connection/DdlMessageTest.java create mode 100644 src/test/java/io/debezium/connector/vitess/connection/DdlMetadataExtractorTest.java diff --git a/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java b/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java index e7ba94c4..8870fa76 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java +++ b/src/main/java/io/debezium/connector/vitess/VitessStreamingChangeEventSource.java @@ -11,6 +11,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.debezium.DebeziumException; +import io.debezium.connector.vitess.connection.DdlMetadataExtractor; import io.debezium.connector.vitess.connection.ReplicationConnection; import io.debezium.connector.vitess.connection.ReplicationMessage; import io.debezium.connector.vitess.connection.ReplicationMessageProcessor; @@ -19,6 +21,7 @@ import io.debezium.pipeline.EventDispatcher; import io.debezium.pipeline.source.spi.StreamingChangeEventSource; import io.debezium.relational.TableId; +import io.debezium.schema.SchemaChangeEvent; import io.debezium.util.Clock; import io.debezium.util.DelayStrategy; @@ -108,10 +111,36 @@ else if (message.getOperation() == ReplicationMessage.Operation.COMMIT) { } return; } - else if (message.getOperation() == ReplicationMessage.Operation.DDL || message.getOperation() == ReplicationMessage.Operation.OTHER) { - // DDL event or OTHER event + else if (message.getOperation() == ReplicationMessage.Operation.OTHER) { offsetContext.rotateVgtid(newVgtid, message.getCommitTime()); } + else if (message.getOperation() == ReplicationMessage.Operation.DDL) { + offsetContext.rotateVgtid(newVgtid, message.getCommitTime()); + offsetContext.setShard(message.getShard()); + + DdlMetadataExtractor metadataExtractor = new DdlMetadataExtractor(message); + TableId tableId = VitessDatabaseSchema.parse(metadataExtractor.getTable()); + offsetContext.event(tableId, message.getCommitTime()); + String ddlStatement = message.getStatement(); + SchemaChangeEvent.SchemaChangeEventType eventType = metadataExtractor.getSchemaChangeEventType(); + SchemaChangeEvent schemaChangeEvent = SchemaChangeEvent.of( + eventType, + partition, + offsetContext, + connectorConfig.getKeyspace(), + null, + ddlStatement, + null, + false); + dispatcher.dispatchSchemaChangeEvent(partition, offsetContext, null, (receiver) -> { + try { + receiver.schemaChangeEvent(schemaChangeEvent); + } + catch (Exception e) { + throw new DebeziumException(e); + } + }); + } else if (message.getOperation().equals(ReplicationMessage.Operation.HEARTBEAT)) { dispatcher.dispatchHeartbeatEvent(partition, offsetContext); } diff --git a/src/main/java/io/debezium/connector/vitess/connection/DdlMessage.java b/src/main/java/io/debezium/connector/vitess/connection/DdlMessage.java index 293ce4f7..2eb94851 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/DdlMessage.java +++ b/src/main/java/io/debezium/connector/vitess/connection/DdlMessage.java @@ -14,11 +14,17 @@ public class DdlMessage implements ReplicationMessage { private final String transactionId; private final Instant commitTime; private final Operation operation; + private final String statement; + private final String shard; + private final String keyspace; - public DdlMessage(String transactionId, Instant commitTime) { + public DdlMessage(String transactionId, Instant commitTime, String statement, String keyspace, String shard) { this.transactionId = transactionId; this.commitTime = commitTime; this.operation = Operation.DDL; + this.statement = statement; + this.keyspace = keyspace; + this.shard = shard; } @Override @@ -41,9 +47,19 @@ public String getTable() { throw new UnsupportedOperationException(); } + @Override + public String getStatement() { + return statement; + } + + @Override + public String getKeyspace() { + return keyspace; + } + @Override public String getShard() { - throw new UnsupportedOperationException(); + return shard; } @Override @@ -67,8 +83,14 @@ public String toString() { + "transactionId='" + transactionId + '\'' + + ", keyspace=" + + keyspace + + ", shard=" + + shard + ", commitTime=" + commitTime + + ", statement=" + + statement + ", operation=" + operation + '}'; diff --git a/src/main/java/io/debezium/connector/vitess/connection/DdlMetadataExtractor.java b/src/main/java/io/debezium/connector/vitess/connection/DdlMetadataExtractor.java new file mode 100644 index 00000000..d5b7d90b --- /dev/null +++ b/src/main/java/io/debezium/connector/vitess/connection/DdlMetadataExtractor.java @@ -0,0 +1,52 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.vitess.connection; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import io.debezium.connector.vitess.VitessDatabaseSchema; +import io.debezium.schema.SchemaChangeEvent; + +/** + * @author Thomas Thornton + */ +public class DdlMetadataExtractor { + + // VStream DDL statements do not contain any database/keyspace, only contains the table name + private static final Pattern TABLE_NAME_PATTERN = Pattern.compile( + "(?i)(CREATE|ALTER|TRUNCATE|DROP|RENAME)\\s+TABLE\\s+['\\\"`]?([\\w]+)['\\\"`]?", + Pattern.CASE_INSENSITIVE); + + private final DdlMessage ddlMessage; + private String operation; + private String table; + + public DdlMetadataExtractor(ReplicationMessage ddlMessage) { + this.ddlMessage = (DdlMessage) ddlMessage; + extractMetadata(); + } + + public void extractMetadata() { + Matcher matcher = TABLE_NAME_PATTERN.matcher(this.ddlMessage.getStatement()); + if (matcher.find()) { + operation = matcher.group(1).split("\s+")[0].toUpperCase(); + if (operation.equals("RENAME")) { + operation = "ALTER"; + } + table = matcher.group(2); + } + } + + public SchemaChangeEvent.SchemaChangeEventType getSchemaChangeEventType() { + return SchemaChangeEvent.SchemaChangeEventType.valueOf(operation); + } + + public String getTable() { + return VitessDatabaseSchema.buildTableId(ddlMessage.getShard(), ddlMessage.getKeyspace(), table).toDoubleQuotedString(); + } +} diff --git a/src/main/java/io/debezium/connector/vitess/connection/HeartbeatMessage.java b/src/main/java/io/debezium/connector/vitess/connection/HeartbeatMessage.java index b3c8a4c7..8fb0ece2 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/HeartbeatMessage.java +++ b/src/main/java/io/debezium/connector/vitess/connection/HeartbeatMessage.java @@ -34,6 +34,11 @@ public String getTransactionId() { throw new UnsupportedOperationException(); } + @Override + public String getKeyspace() { + throw new UnsupportedOperationException(); + } + @Override public String getTable() { throw new UnsupportedOperationException(); @@ -44,6 +49,11 @@ public String getShard() { throw new UnsupportedOperationException(); } + @Override + public String getStatement() { + throw new UnsupportedOperationException(); + } + @Override public List getOldTupleList() { throw new UnsupportedOperationException(); diff --git a/src/main/java/io/debezium/connector/vitess/connection/OtherMessage.java b/src/main/java/io/debezium/connector/vitess/connection/OtherMessage.java index 2b5179ea..3980cc93 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/OtherMessage.java +++ b/src/main/java/io/debezium/connector/vitess/connection/OtherMessage.java @@ -36,11 +36,21 @@ public String getTransactionId() { return transactionId; } + @Override + public String getKeyspace() { + throw new UnsupportedOperationException(); + } + @Override public String getTable() { throw new UnsupportedOperationException(); } + @Override + public String getStatement() { + throw new UnsupportedOperationException(); + } + @Override public String getShard() { throw new UnsupportedOperationException(); diff --git a/src/main/java/io/debezium/connector/vitess/connection/ReplicationMessage.java b/src/main/java/io/debezium/connector/vitess/connection/ReplicationMessage.java index e453d188..877d19bc 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/ReplicationMessage.java +++ b/src/main/java/io/debezium/connector/vitess/connection/ReplicationMessage.java @@ -72,6 +72,8 @@ interface ColumnValue { String getTransactionId(); + String getKeyspace(); + String getTable(); String getShard(); @@ -80,6 +82,8 @@ interface ColumnValue { List getNewTupleList(); + String getStatement(); + default boolean isTransactionalMessage() { return getOperation() == Operation.BEGIN || getOperation() == Operation.COMMIT; } diff --git a/src/main/java/io/debezium/connector/vitess/connection/TransactionalMessage.java b/src/main/java/io/debezium/connector/vitess/connection/TransactionalMessage.java index 76c38796..500079d7 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/TransactionalMessage.java +++ b/src/main/java/io/debezium/connector/vitess/connection/TransactionalMessage.java @@ -14,12 +14,14 @@ public class TransactionalMessage implements ReplicationMessage { private final String transactionId; private final Instant commitTime; private final Operation operation; + private final String keyspace; private final String shard; - public TransactionalMessage(Operation operation, String transactionId, Instant commitTime, String shard) { + public TransactionalMessage(Operation operation, String transactionId, Instant commitTime, String keyspace, String shard) { this.transactionId = transactionId; this.commitTime = commitTime; this.operation = operation; + this.keyspace = keyspace; this.shard = shard; } @@ -38,6 +40,11 @@ public String getTransactionId() { return transactionId; } + @Override + public String getKeyspace() { + throw new UnsupportedOperationException(); + } + @Override public String getTable() { throw new UnsupportedOperationException(); @@ -48,6 +55,11 @@ public String getShard() { return shard; } + @Override + public String getStatement() { + throw new UnsupportedOperationException(); + } + @Override public List getOldTupleList() { throw new UnsupportedOperationException(); diff --git a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java index 7bcf61f7..135e965c 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java +++ b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputMessageDecoder.java @@ -100,7 +100,7 @@ private void handleDdl(Binlogdata.VEvent vEvent, ReplicationMessageProcessor pro this.transactionId = newVgtid.toString(); } processor.process( - new DdlMessage(transactionId, eventTimestamp), newVgtid, false); + new DdlMessage(transactionId, eventTimestamp, vEvent.getStatement(), vEvent.getKeyspace(), vEvent.getShard()), newVgtid, false); } private void handleOther(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid) @@ -133,7 +133,7 @@ private void handleBeginMessage(Binlogdata.VEvent vEvent, ReplicationMessageProc } LOGGER.trace("Timestamp of begin transaction: {}", eventTimestamp); processor.process( - new TransactionalMessage(Operation.BEGIN, transactionId, eventTimestamp, vEvent.getShard()), newVgtid, false); + new TransactionalMessage(Operation.BEGIN, transactionId, eventTimestamp, vEvent.getKeyspace(), vEvent.getShard()), newVgtid, false); } private void handleCommitMessage( @@ -147,7 +147,7 @@ private void handleCommitMessage( } LOGGER.trace("Timestamp of commit transaction: {}", commitTimestamp); processor.process( - new TransactionalMessage(Operation.COMMIT, transactionId, eventTimestamp, vEvent.getShard()), newVgtid, false); + new TransactionalMessage(Operation.COMMIT, transactionId, eventTimestamp, vEvent.getKeyspace(), vEvent.getShard()), newVgtid, false); } private void decodeRows(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid, boolean isLastRowEventOfTransaction) @@ -216,6 +216,7 @@ private void decodeInsert( Operation.INSERT, commitTimestamp, transactionId, + schemaName, tableId.toDoubleQuotedString(), shard, null, @@ -256,6 +257,7 @@ private void decodeUpdate( Operation.UPDATE, commitTimestamp, transactionId, + schemaName, tableId.toDoubleQuotedString(), shard, oldColumns, @@ -294,6 +296,7 @@ private void decodeDelete( Operation.DELETE, commitTimestamp, transactionId, + schemaName, tableId.toDoubleQuotedString(), shard, columns, diff --git a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputReplicationMessage.java b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputReplicationMessage.java index cb806417..98275d7c 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputReplicationMessage.java +++ b/src/main/java/io/debezium/connector/vitess/connection/VStreamOutputReplicationMessage.java @@ -17,6 +17,7 @@ public class VStreamOutputReplicationMessage implements ReplicationMessage { private final Operation op; private final Instant commitTimestamp; private final String transactionId; + private final String keyspace; private final String table; private final String shard; private final List oldColumns; @@ -26,6 +27,7 @@ public VStreamOutputReplicationMessage( Operation op, Instant commitTimestamp, String transactionId, + String keyspace, String table, String shard, List oldColumns, @@ -33,6 +35,7 @@ public VStreamOutputReplicationMessage( this.op = op; this.commitTimestamp = commitTimestamp; this.transactionId = transactionId; + this.keyspace = keyspace; this.table = table; this.shard = shard; this.oldColumns = oldColumns; @@ -54,6 +57,11 @@ public String getTransactionId() { return transactionId; } + @Override + public String getKeyspace() { + return keyspace; + } + @Override public String getTable() { return table; @@ -64,6 +72,11 @@ public String getShard() { return shard; } + @Override + public String getStatement() { + throw new UnsupportedOperationException(); + } + @Override public List getOldTupleList() { return oldColumns; diff --git a/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java b/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java index 81369899..93b7937b 100644 --- a/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java +++ b/src/main/java/io/debezium/connector/vitess/connection/VitessReplicationConnection.java @@ -414,8 +414,8 @@ public static Vgtid defaultVgtid(VitessConnectorConfig config) { if (config.offsetStoragePerTask()) { List shards = config.getVitessTaskKeyShards(); vgtid = config.getVitessTaskVgtid(); - LOGGER.info("VGTID '{}' is set for the keyspace: {} shards: {}", - vgtid, config.getKeyspace(), shards); + LOGGER.info("VGTID is set for the keyspace: {}, shards: {}, vgtid: {}", + config.getKeyspace(), shards, vgtid); } else { // If offset storage per task is disabled, then find the vgtid elsewhere diff --git a/src/test/java/io/debezium/connector/vitess/TestHelper.java b/src/test/java/io/debezium/connector/vitess/TestHelper.java index ca0b77fa..c7aa5cef 100644 --- a/src/test/java/io/debezium/connector/vitess/TestHelper.java +++ b/src/test/java/io/debezium/connector/vitess/TestHelper.java @@ -31,6 +31,7 @@ import io.debezium.connector.vitess.connection.ReplicationMessageColumn; import io.debezium.connector.vitess.connection.VitessTabletType; import io.debezium.connector.vitess.pipeline.txmetadata.ShardEpochMap; +import io.debezium.embedded.EmbeddedEngineConfig; import io.debezium.relational.RelationalDatabaseConnectorConfig; import io.debezium.relational.TableId; import io.vitess.proto.Query; @@ -149,6 +150,8 @@ public static Configuration.Builder defaultConfig(boolean hasMultipleShards, .with(VitessConnectorConfig.VTGATE_PORT, VTGATE_PORT) .with(VitessConnectorConfig.VTGATE_USER, USERNAME) .with(VitessConnectorConfig.VTGATE_PASSWORD, PASSWORD) + // Only wait 5 seconds to stop, not default of 5 minutes + .with(EmbeddedEngineConfig.WAIT_FOR_COMPLETION_BEFORE_INTERRUPT_MS, 5000) .with(VitessConnectorConfig.POLL_INTERVAL_MS, 100); if (!Strings.isNullOrEmpty(tableInclude)) { builder.with(RelationalDatabaseConnectorConfig.TABLE_INCLUDE_LIST, tableInclude); diff --git a/src/test/java/io/debezium/connector/vitess/VitessBigIntUnsignedTest.java b/src/test/java/io/debezium/connector/vitess/VitessBigIntUnsignedTest.java index b8420b2e..699fa95a 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessBigIntUnsignedTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessBigIntUnsignedTest.java @@ -96,6 +96,7 @@ protected void handleInsert(VitessConnectorConfig.BigIntUnsignedHandlingMode mod ReplicationMessage.Operation.INSERT, AnonymousValue.getInstant(), AnonymousValue.getString(), + AnonymousValue.getString(), TestHelper.defaultTableId().toDoubleQuotedString(), TestHelper.TEST_SHARD, null, diff --git a/src/test/java/io/debezium/connector/vitess/VitessChangeRecordEmitterTest.java b/src/test/java/io/debezium/connector/vitess/VitessChangeRecordEmitterTest.java index 89d49d19..96fe9220 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessChangeRecordEmitterTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessChangeRecordEmitterTest.java @@ -48,6 +48,7 @@ public void shouldGetNewColumnValuesFromInsert() { ReplicationMessage.Operation.INSERT, AnonymousValue.getInstant(), AnonymousValue.getString(), + AnonymousValue.getString(), TestHelper.defaultTableId().toDoubleQuotedString(), AnonymousValue.getString(), null, @@ -75,6 +76,7 @@ public void shouldGetOldColumnValuesFromDelete() { ReplicationMessage.Operation.DELETE, AnonymousValue.getInstant(), AnonymousValue.getString(), + AnonymousValue.getString(), TestHelper.defaultTableId().toDoubleQuotedString(), AnonymousValue.getString(), TestHelper.defaultRelationMessageColumns(), @@ -102,6 +104,7 @@ public void shouldGetOldAndNewColumnValuesFromUpdate() { ReplicationMessage.Operation.UPDATE, AnonymousValue.getInstant(), AnonymousValue.getString(), + AnonymousValue.getString(), TestHelper.defaultTableId().toDoubleQuotedString(), AnonymousValue.getString(), TestHelper.defaultRelationMessageColumns(), @@ -126,7 +129,7 @@ public void shouldGetOldAndNewColumnValuesFromUpdate() { public void shouldNotSupportBeginMessage() { // setup fixture ReplicationMessage message = new TransactionalMessage(ReplicationMessage.Operation.BEGIN, AnonymousValue.getString(), AnonymousValue.getInstant(), - AnonymousValue.getString()); + AnonymousValue.getString(), AnonymousValue.getString()); // exercise SUT new VitessChangeRecordEmitter( @@ -142,7 +145,7 @@ public void shouldNotSupportBeginMessage() { public void shouldNotSupportCommitMessage() { // setup fixture ReplicationMessage message = new TransactionalMessage(ReplicationMessage.Operation.COMMIT, AnonymousValue.getString(), AnonymousValue.getInstant(), - AnonymousValue.getString()); + AnonymousValue.getString(), AnonymousValue.getString()); // exercise SUT new VitessChangeRecordEmitter( diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java index 33c21b83..21b4d5b5 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java @@ -8,6 +8,7 @@ import static io.debezium.connector.vitess.TestHelper.TEST_EMPTY_SHARD_KEYSPACE; import static io.debezium.connector.vitess.TestHelper.TEST_NON_EMPTY_SHARD; import static io.debezium.connector.vitess.TestHelper.TEST_SERVER; +import static io.debezium.connector.vitess.TestHelper.TEST_SHARD; import static io.debezium.connector.vitess.TestHelper.TEST_SHARD1; import static io.debezium.connector.vitess.TestHelper.TEST_SHARD1_EPOCH; import static io.debezium.connector.vitess.TestHelper.TEST_SHARD2; @@ -208,6 +209,112 @@ public void shouldReceiveHeartbeatEventsShardedKeyspace() throws Exception { assertThat(vgtid.getShardGtids().get(0).getShard()).isEqualTo(expectedShard); } + @Test + @FixFor("DBZ-8325") + public void shouldReceiveSchemaChangeEventAfterDataChangeEvent() throws Exception { + TestHelper.executeDDL("vitess_create_tables.ddl"); + // startConnector(); + startConnector(config -> config + .with(CommonConnectorConfig.INCLUDE_SCHEMA_CHANGES, true), + false); + assertConnectorIsRunning(); + + String schemaChangeTopic = TestHelper.defaultConfig().build().getString(CommonConnectorConfig.TOPIC_PREFIX); + String dataChangeTopic = String.join(".", + TestHelper.defaultConfig().build().getString(CommonConnectorConfig.TOPIC_PREFIX), + TEST_UNSHARDED_KEYSPACE, + "ddl_table"); + + String ddl = "ALTER TABLE ddl_table ADD COLUMN new_column_name INT"; + TestHelper.execute("INSERT INTO ddl_table (id, int_unsigned_col, json_col) VALUES (1, 2, '{\"1\":2}');"); + TestHelper.execute(ddl); + + int expectedDataChangeRecords = 1; + int expectedSchemaChangeRecords = 1; + int expectedTotalRecords = expectedDataChangeRecords + expectedSchemaChangeRecords; + consumer = testConsumer(expectedTotalRecords); + consumer.expects(expectedTotalRecords); + consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); + for (int i = 0; i < expectedTotalRecords; i++) { + SourceRecord record = consumer.remove(); + Struct value = (Struct) record.value(); + Struct source = (Struct) value.get("source"); + assertThat(source.getString("table")).isEqualTo("ddl_table"); + assertThat(source.getString("shard")).isEqualTo(TEST_SHARD); + if (i == 1) { + assertThat(record.topic()).isEqualTo(schemaChangeTopic); + assertThat(value.getString("ddl")).isEqualToIgnoringCase(ddl); + } + else { + assertThat(record.topic()).isEqualTo(dataChangeTopic); + } + } + assertThat(consumer.isEmpty()); + } + + @Test + @FixFor("DBZ-8325") + public void shouldReceiveSchemaEventsShardedBeforeAnyDataEvents() throws Exception { + String keyspace = TEST_SHARDED_KEYSPACE; + String table = keyspace + ".ddl_table"; + TestHelper.executeDDL("vitess_create_tables.ddl", keyspace); + TestHelper.applyVSchema("vitess_vschema.json"); + startConnector(config -> config + .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true), + true); + assertConnectorIsRunning(); + + String schemaChangeTopic = TestHelper.defaultConfig().build().getString(CommonConnectorConfig.TOPIC_PREFIX); + + String addCol = "ALTER TABLE ddl_table ADD COLUMN new_column_name INT"; + String addPartition = "ALTER TABLE ddl_table ADD PARTITION (PARTITION p2 VALUES LESS THAN (2000))"; + String dropPartition = "ALTER TABLE ddl_table DROP PARTITION p0"; + String truncateTable = "TRUNCATE TABLE ddl_table"; + // Put in the fully qualified table name (with keyspace) to ensure we can parse the table name fine + String dropTable = "DROP TABLE test_sharded_keyspace.ddl_table"; + String createTable = "CREATE TABLE test_sharded_keyspace.ddl_table (id BIGINT NOT NULL AUTO_INCREMENT, PRIMARY KEY (id))"; + TestHelper.execute(addCol, TEST_SHARDED_KEYSPACE); + TestHelper.execute(addPartition, TEST_SHARDED_KEYSPACE); + TestHelper.execute(dropPartition, TEST_SHARDED_KEYSPACE); + TestHelper.execute(truncateTable, TEST_SHARDED_KEYSPACE); + TestHelper.execute(dropTable, TEST_SHARDED_KEYSPACE); + TestHelper.execute(createTable, TEST_SHARDED_KEYSPACE); + + // 6 for the changes above + // 2 shards, so 6 * 2 = 12 + int expectedSchemaChangeRecords = 12; + consumer = testConsumer(expectedSchemaChangeRecords); + consumer.expects(expectedSchemaChangeRecords); + consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); + for (int i = 0; i < expectedSchemaChangeRecords; i++) { + SourceRecord record = consumer.remove(); + assertThat(record.topic()).isEqualTo(schemaChangeTopic); + Struct value = (Struct) record.value(); + Struct source = (Struct) value.get("source"); + assertThat(source.getString("table")).isEqualTo("ddl_table"); + assertThat(source.getString("shard")).isIn(List.of(TEST_SHARD1, TEST_SHARD2)); + if (i < 2) { + assertThat(value.getString("ddl")).isEqualToIgnoringCase(addCol); + } + else if (i < 4) { + assertThat(value.getString("ddl")).isEqualToIgnoringCase(addPartition); + } + else if (i < 6) { + assertThat(value.getString("ddl")).isEqualToIgnoringCase(dropPartition); + } + else if (i < 8) { + assertThat(value.getString("ddl")).isEqualToIgnoringCase(truncateTable); + } + else if (i < 10) { + assertThat(value.getString("ddl")).containsIgnoringCase("DROP TABLE"); + } + else if (i < 12) { + assertThat(value.getString("ddl")).containsIgnoringCase("CREATE TABLE"); + } + } + assertThat(consumer.isEmpty()); + } + @Test @FixFor("DBZ-2776") public void shouldReceiveChangesForInsertsWithEnum() throws Exception { @@ -1801,7 +1908,8 @@ public void testMidSnapshotRecoveryLargeTable() throws Exception { assertThat(recordCount < expectedSnapshotRecordsCount).isTrue(); // Assert the total snapshot records are sent after starting consumer = testConsumer(expectedSnapshotRecordsCount, tableInclude); - startConnector(); + startConnector(Function.identity(), false, false, 1, + -1, -1, tableInclude, VitessConnectorConfig.SnapshotMode.INITIAL, TestHelper.TEST_SHARD); consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); for (int i = 1; i <= expectedSnapshotRecordsCount; i++) { diff --git a/src/test/java/io/debezium/connector/vitess/connection/DdlMessageTest.java b/src/test/java/io/debezium/connector/vitess/connection/DdlMessageTest.java new file mode 100644 index 00000000..da36142c --- /dev/null +++ b/src/test/java/io/debezium/connector/vitess/connection/DdlMessageTest.java @@ -0,0 +1,59 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.vitess.connection; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.time.Instant; + +import org.junit.Test; + +/** + * Verify the behavior of {@link DdlMessage} + * + * @author Thomas Thornton + */ +public class DdlMessageTest { + + @Test + public void shouldSetQuery() { + String statement = "ALTER TABLE foo RENAME TO bar"; + String keyspace = "keyspace"; + String shard = "-80"; + DdlMessage ddlMessage = new DdlMessage("gtid", Instant.EPOCH, statement, keyspace, shard); + assertThat(ddlMessage.getStatement()).isEqualTo(statement); + } + + @Test + public void shouldSetShard() { + String statement = "ALTER TABLE foo RENAME TO bar"; + String shard = "-80"; + String keyspace = "keyspace"; + DdlMessage ddlMessage = new DdlMessage("gtid", Instant.EPOCH, statement, keyspace, shard); + assertThat(ddlMessage.getShard()).isEqualTo(shard); + } + + @Test + public void shouldSetKeyspace() { + String statement = "ALTER TABLE foo RENAME TO bar"; + String shard = "-80"; + String keyspace = "keyspace"; + DdlMessage ddlMessage = new DdlMessage("gtid", Instant.EPOCH, statement, keyspace, shard); + assertThat(ddlMessage.getKeyspace()).isEqualTo(keyspace); + } + + @Test + public void shouldConvertToString() { + String statement = "ALTER TABLE foo RENAME TO bar"; + String shard = "-80"; + String keyspace = "keyspace"; + ReplicationMessage replicationMessage = new DdlMessage("gtid", Instant.EPOCH, statement, keyspace, shard); + assertThat(replicationMessage.toString()).isEqualTo( + "DdlMessage{transactionId='gtid', keyspace=keyspace, shard=-80, commitTime=1970-01-01T00:00:00Z, statement=ALTER TABLE foo RENAME TO bar, operation=DDL}"); + } + +} diff --git a/src/test/java/io/debezium/connector/vitess/connection/DdlMetadataExtractorTest.java b/src/test/java/io/debezium/connector/vitess/connection/DdlMetadataExtractorTest.java new file mode 100644 index 00000000..eb1a0083 --- /dev/null +++ b/src/test/java/io/debezium/connector/vitess/connection/DdlMetadataExtractorTest.java @@ -0,0 +1,69 @@ +/* + * Copyright Debezium Authors. + * + * Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0 + */ + +package io.debezium.connector.vitess.connection; + +import static org.assertj.core.api.Assertions.assertThat; + +import org.junit.Test; + +import io.debezium.connector.vitess.TestHelper; +import io.debezium.schema.SchemaChangeEvent; + +/** + * @author Thomas Thornton + */ +public class DdlMetadataExtractorTest { + + @Test + public void shouldGetAlterType() { + DdlMessage ddlMessage = new DdlMessage(null, null, "ALTER TABLE foo ADD COLUMN bar", + TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD); + DdlMetadataExtractor extractor = new DdlMetadataExtractor(ddlMessage); + assertThat(extractor.getSchemaChangeEventType()).isEqualTo(SchemaChangeEvent.SchemaChangeEventType.ALTER); + } + + @Test + public void shouldGetCreateType() { + DdlMessage ddlMessage = new DdlMessage(null, null, "CREATE TABLE foo", + TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD); + DdlMetadataExtractor extractor = new DdlMetadataExtractor(ddlMessage); + assertThat(extractor.getSchemaChangeEventType()).isEqualTo(SchemaChangeEvent.SchemaChangeEventType.CREATE); + } + + @Test + public void shouldGetTruncateType() { + DdlMessage ddlMessage = new DdlMessage(null, null, "TRUNCATE TABLE foo", + TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD); + DdlMetadataExtractor extractor = new DdlMetadataExtractor(ddlMessage); + assertThat(extractor.getSchemaChangeEventType()).isEqualTo(SchemaChangeEvent.SchemaChangeEventType.TRUNCATE); + } + + @Test + public void shouldGetTable() { + DdlMessage ddlMessage = new DdlMessage(null, null, "TRUNCATE TABLE foo", + TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD); + DdlMetadataExtractor extractor = new DdlMetadataExtractor(ddlMessage); + assertThat(extractor.getTable()).isEqualTo("\"0\".\"test_unsharded_keyspace\".\"foo\""); + } + + @Test + public void shouldGetDropType() { + DdlMessage ddlMessage = new DdlMessage(null, null, "DROP TABLE foo", + TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD); + DdlMetadataExtractor extractor = new DdlMetadataExtractor(ddlMessage); + assertThat(extractor.getSchemaChangeEventType()).isEqualTo(SchemaChangeEvent.SchemaChangeEventType.DROP); + } + + @Test + public void shouldGetRenameType() { + DdlMessage ddlMessage = new DdlMessage(null, null, "RENAME TABLE foo TO bar", + TestHelper.TEST_UNSHARDED_KEYSPACE, TestHelper.TEST_SHARD); + DdlMetadataExtractor extractor = new DdlMetadataExtractor(ddlMessage); + assertThat(extractor.getSchemaChangeEventType()).isEqualTo(SchemaChangeEvent.SchemaChangeEventType.ALTER); + } + +} diff --git a/src/test/resources/vitess_create_tables.ddl b/src/test/resources/vitess_create_tables.ddl index c79212e4..8f7b8701 100644 --- a/src/test/resources/vitess_create_tables.ddl +++ b/src/test/resources/vitess_create_tables.ddl @@ -20,6 +20,18 @@ CREATE TABLE numeric_table PRIMARY KEY (id) ); +DROP TABLE IF EXISTS ddl_table; +CREATE TABLE ddl_table +( + id BIGINT NOT NULL AUTO_INCREMENT, + int_unsigned_col INT UNSIGNED DEFAULT 0, + json_col JSON, + PRIMARY KEY (id) +) +PARTITION BY RANGE (id) ( + PARTITION p0 VALUES LESS THAN (1000) +); + DROP TABLE IF EXISTS string_table; CREATE TABLE string_table ( diff --git a/src/test/resources/vitess_vschema.json b/src/test/resources/vitess_vschema.json index 0681d287..47104980 100644 --- a/src/test/resources/vitess_vschema.json +++ b/src/test/resources/vitess_vschema.json @@ -18,6 +18,18 @@ "sequence": "my_seq" } }, + "ddl_table": { + "columnVindexes": [ + { + "column": "id", + "name": "hash" + } + ], + "autoIncrement": { + "column": "id", + "sequence": "my_seq" + } + }, "string_table": { "columnVindexes": [ { From ad8dccbe16ee63371882af96a89e4a314611fb02 Mon Sep 17 00:00:00 2001 From: twthorn Date: Fri, 8 Nov 2024 15:17:22 -0500 Subject: [PATCH 2/3] DBZ-8325 Support override schema change topic & default include schema changes to false --- .../vitess/TableTopicNamingStrategy.java | 37 +++++++++++++++ .../vitess/VitessConnectorConfig.java | 28 +++++++++++ .../vitess/TableTopicNamingStrategyTest.java | 22 +++++++++ .../vitess/VitessConnectorConfigTest.java | 37 +++++++++++++++ .../connector/vitess/VitessConnectorIT.java | 46 ++++++++++++++++++- 5 files changed, 169 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/debezium/connector/vitess/TableTopicNamingStrategy.java b/src/main/java/io/debezium/connector/vitess/TableTopicNamingStrategy.java index b6a437d4..75124a10 100644 --- a/src/main/java/io/debezium/connector/vitess/TableTopicNamingStrategy.java +++ b/src/main/java/io/debezium/connector/vitess/TableTopicNamingStrategy.java @@ -15,18 +15,37 @@ import io.debezium.util.Strings; /** + *

* Topic naming strategy where only the table name is added. This is used to avoid including * the shard which is now part of the catalog of the table ID and would be included if * the DefaultTopicNamingStrategy is being used. + *

+ * Additionally, supports some Vitess-specific configs: + *
    + *
  • + * overrideDataChangeTopicPrefix: in the case of mulitple connectors for the same keyspace, + * a unique `topic.prefix` is required for proper metric reporting, so in order for a consistent topic + * naming convention, the data change topic prefix can be set here (typically shared between connectors of the same + * keyspace + *
  • + *
  • + * overrideSchemaChangeTopic: in the case of multiple connectors for the same keyspace and `include.schema.changes` being enabled, + * this is used to prevent the ddl events from being written to the topic named `topic.prefix`, which is the default behavior. + * The reason why this is necessary is that the `topic.prefix` may include the table name for uniqueness, so it may actually be the name + * of a data change topic. + *
  • + *
*/ public class TableTopicNamingStrategy extends AbstractTopicNamingStrategy { private final String overrideDataChangeTopicPrefix; + private final String overrideSchemaChangeTopic; public TableTopicNamingStrategy(Properties props) { super(props); Configuration config = Configuration.from(props); this.overrideDataChangeTopicPrefix = config.getString(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX); + this.overrideSchemaChangeTopic = config.getString(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC); } public static TableTopicNamingStrategy create(CommonConnectorConfig config) { @@ -44,4 +63,22 @@ public String dataChangeTopic(TableId id) { } return topicNames.computeIfAbsent(id, t -> sanitizedTopicName(topicName)); } + + /** + * Return the schema change topic. There are two cases: + * 1. If override schema change topic is specified - use this as the topic name + * 2. If override schema change topic is not specified - call the super method to get the typical + * schema change topic name. + * + * @return String representing the schema change topic name. + */ + @Override + public String schemaChangeTopic() { + if (!Strings.isNullOrBlank(overrideSchemaChangeTopic)) { + return overrideSchemaChangeTopic; + } + else { + return super.schemaChangeTopic(); + } + } } diff --git a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java index 6437cee9..22817fc3 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java +++ b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java @@ -348,6 +348,20 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue + "'false' (the default) omits the fields; " + "'true' converts the field into an implementation dependent binary representation."); + // Needed for backward compatibility, otherwise all upgraded connectors will start publishing schema change events + // by default. + public static final Field INCLUDE_SCHEMA_CHANGES = Field.create("include.schema.changes") + .withDisplayName("Include database schema changes") + .withType(Type.BOOLEAN) + .withGroup(Field.createGroupEntry(Field.Group.CONNECTOR, 0)) + .withWidth(Width.SHORT) + .withImportance(ConfigDef.Importance.MEDIUM) + .withDescription("Whether the connector should publish changes in the database schema to a Kafka topic with " + + "the same name as the database server ID. Each schema change will be recorded using a key that " + + "contains the database name and whose value include logical description of the new schema and optionally the DDL statement(s). " + + "The default is 'true'. This is independent of how the connector internally records database schema history.") + .withDefault(false); + public static final Field OFFSET_STORAGE_PER_TASK = Field.create(VITESS_CONFIG_GROUP_PREFIX + "offset.storage.per.task") .withDisplayName("Store offsets per task") .withType(Type.BOOLEAN) @@ -368,6 +382,14 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue .withValidation(CommonConnectorConfig::validateTopicName) .withDescription("Overrides the topic.prefix used for the data change topic."); + public static final Field OVERRIDE_SCHEMA_CHANGE_TOPIC = Field.create("override.schema.change.topic") + .withDisplayName("Override schema change topic name") + .withType(Type.STRING) + .withWidth(Width.MEDIUM) + .withImportance(ConfigDef.Importance.LOW) + .withValidation(CommonConnectorConfig::validateTopicName) + .withDescription("Overrides the name of the schema change topic (if not set uses topic.prefx)."); + public static final Field OFFSET_STORAGE_TASK_KEY_GEN = Field.create(VITESS_CONFIG_GROUP_PREFIX + "offset.storage.task.key.gen") .withDisplayName("Offset storage task key generation number") .withType(Type.INT) @@ -479,6 +501,7 @@ private static int validateTimePrecisionMode(Configuration config, Field field, GRPC_MAX_INBOUND_MESSAGE_SIZE, BINARY_HANDLING_MODE, OVERRIDE_DATA_CHANGE_TOPIC_PREFIX, + OVERRIDE_SCHEMA_CHANGE_TOPIC, SCHEMA_NAME_ADJUSTMENT_MODE, OFFSET_STORAGE_PER_TASK, OFFSET_STORAGE_TASK_KEY_GEN, @@ -553,6 +576,11 @@ public String getConnectorName() { return Module.name(); } + @Override + public boolean isSchemaChangesHistoryEnabled() { + return getConfig().getBoolean(INCLUDE_SCHEMA_CHANGES); + } + @Override public TemporalPrecisionMode getTemporalPrecisionMode() { return TemporalPrecisionMode.parse(getConfig().getString(TIME_PRECISION_MODE)); diff --git a/src/test/java/io/debezium/connector/vitess/TableTopicNamingStrategyTest.java b/src/test/java/io/debezium/connector/vitess/TableTopicNamingStrategyTest.java index b1656036..0b02eb5f 100644 --- a/src/test/java/io/debezium/connector/vitess/TableTopicNamingStrategyTest.java +++ b/src/test/java/io/debezium/connector/vitess/TableTopicNamingStrategyTest.java @@ -52,4 +52,26 @@ public void shouldUseTopicPrefixIfOverrideIsBlank() { assertThat(topicName).isEqualTo("prefix.table"); } + @Test + public void shouldGetOverrideSchemaChangeTopic() { + TableId tableId = new TableId("shard", "keyspace", "table"); + final Properties props = new Properties(); + props.put("topic.prefix", "prefix"); + props.put(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC.name(), "override-prefix"); + TopicNamingStrategy strategy = new TableTopicNamingStrategy(props); + String topicName = strategy.schemaChangeTopic(); + assertThat(topicName).isEqualTo("override-prefix"); + } + + @Test + public void shouldUseTopicPrefixIfOverrideSchemaIsBlank() { + TableId tableId = new TableId("shard", "keyspace", "table"); + final Properties props = new Properties(); + props.put("topic.prefix", "prefix"); + props.put(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC.name(), ""); + TopicNamingStrategy strategy = new TableTopicNamingStrategy(props); + String topicName = strategy.schemaChangeTopic(); + assertThat(topicName).isEqualTo("prefix"); + } + } diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java index ab583331..b9b8d7a5 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java @@ -75,6 +75,43 @@ public void shouldBlankOverrideTopicPrefixFailValidation() { assertThat(inputs.size()).isEqualTo(1); } + @Test + public void shouldImproperOverrideSchemaTopicPrefixFailValidation() { + Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC, "hello@world").build(); + VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration); + List inputs = new ArrayList<>(); + Consumer printConsumer = (input) -> { + inputs.add(input); + }; + connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC), printConsumer); + assertThat(inputs.size()).isEqualTo(1); + } + + @Test + public void shouldUseSchemaTopicPrefix() { + Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC, + "__debezium-ddl.dev.msgdata.precomputed_channel_summary_partitioned").build(); + VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration); + List inputs = new ArrayList<>(); + Consumer printConsumer = (input) -> { + inputs.add(input); + }; + connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC), printConsumer); + assertThat(inputs.size()).isEqualTo(0); + } + + @Test + public void shouldBlankOverrideSchemaTopicPrefixFailValidation() { + Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC, "").build(); + VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration); + List inputs = new ArrayList<>(); + Consumer printConsumer = (input) -> { + inputs.add(input); + }; + connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC), printConsumer); + assertThat(inputs.size()).isEqualTo(1); + } + @Test public void shouldExcludeEmptyShards() { Configuration configuration = TestHelper.defaultConfig().with( diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java index 21b4d5b5..0403cbea 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java @@ -215,7 +215,7 @@ public void shouldReceiveSchemaChangeEventAfterDataChangeEvent() throws Exceptio TestHelper.executeDDL("vitess_create_tables.ddl"); // startConnector(); startConnector(config -> config - .with(CommonConnectorConfig.INCLUDE_SCHEMA_CHANGES, true), + .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true), false); assertConnectorIsRunning(); @@ -252,6 +252,50 @@ public void shouldReceiveSchemaChangeEventAfterDataChangeEvent() throws Exceptio assertThat(consumer.isEmpty()); } + @Test + @FixFor("DBZ-8325") + public void shouldSupportOverrideDataChangeAndSchemaChangeTopics() throws Exception { + TestHelper.executeDDL("vitess_create_tables.ddl"); + // startConnector(); + String overrideDataChangeTopicPrefix = "data.alternate.prefix"; + String overrideSchemaChangeTopic = "schema.alternate.topic"; + startConnector(config -> config + .with(VitessConnectorConfig.TOPIC_NAMING_STRATEGY, TableTopicNamingStrategy.class) + .with(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX, overrideDataChangeTopicPrefix) + .with(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC, overrideSchemaChangeTopic) + .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true), + false); + assertConnectorIsRunning(); + + String dataChangeTopic = String.join(".", overrideDataChangeTopicPrefix, "ddl_table"); + + String ddl = "ALTER TABLE ddl_table ADD COLUMN new_column_name INT"; + TestHelper.execute("INSERT INTO ddl_table (id, int_unsigned_col, json_col) VALUES (1, 2, '{\"1\":2}');"); + TestHelper.execute(ddl); + + int expectedDataChangeRecords = 1; + int expectedSchemaChangeRecords = 1; + int expectedTotalRecords = expectedDataChangeRecords + expectedSchemaChangeRecords; + consumer = testConsumer(expectedTotalRecords); + consumer.expects(expectedTotalRecords); + consumer.await(TestHelper.waitTimeForRecords(), TimeUnit.SECONDS); + for (int i = 0; i < expectedTotalRecords; i++) { + SourceRecord record = consumer.remove(); + Struct value = (Struct) record.value(); + Struct source = (Struct) value.get("source"); + assertThat(source.getString("table")).isEqualTo("ddl_table"); + assertThat(source.getString("shard")).isEqualTo(TEST_SHARD); + if (i == 1) { + assertThat(record.topic()).isEqualTo(overrideSchemaChangeTopic); + assertThat(value.getString("ddl")).isEqualToIgnoringCase(ddl); + } + else { + assertThat(record.topic()).isEqualTo(dataChangeTopic); + } + } + assertThat(consumer.isEmpty()); + } + @Test @FixFor("DBZ-8325") public void shouldReceiveSchemaEventsShardedBeforeAnyDataEvents() throws Exception { From b2a98a1af944a3a32859e9c38ee2d2a20690382d Mon Sep 17 00:00:00 2001 From: twthorn Date: Tue, 12 Nov 2024 15:02:03 -0500 Subject: [PATCH 3/3] DBZ-8325 Refactor configs into vitess TableTopicNamingStrategy --- .../vitess/TableTopicNamingStrategy.java | 44 +++++++++++-- .../vitess/VitessConnectorConfig.java | 18 ------ .../vitess/TableTopicNamingStrategyTest.java | 63 ++++++++++++++++--- .../vitess/VitessConnectorConfigTest.java | 61 ------------------ .../connector/vitess/VitessConnectorIT.java | 4 +- 5 files changed, 97 insertions(+), 93 deletions(-) diff --git a/src/main/java/io/debezium/connector/vitess/TableTopicNamingStrategy.java b/src/main/java/io/debezium/connector/vitess/TableTopicNamingStrategy.java index 75124a10..c1eec204 100644 --- a/src/main/java/io/debezium/connector/vitess/TableTopicNamingStrategy.java +++ b/src/main/java/io/debezium/connector/vitess/TableTopicNamingStrategy.java @@ -7,8 +7,14 @@ import java.util.Properties; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.debezium.config.CommonConnectorConfig; import io.debezium.config.Configuration; +import io.debezium.config.Field; import io.debezium.relational.TableId; import io.debezium.schema.AbstractTopicNamingStrategy; import io.debezium.util.Collect; @@ -38,14 +44,44 @@ */ public class TableTopicNamingStrategy extends AbstractTopicNamingStrategy { - private final String overrideDataChangeTopicPrefix; - private final String overrideSchemaChangeTopic; + private static final Logger LOGGER = LoggerFactory.getLogger(TableTopicNamingStrategy.class); + + public static final Field OVERRIDE_DATA_CHANGE_TOPIC_PREFIX = Field.create("override.data.change.topic.prefix") + .withDisplayName("Override Data Topic prefix") + .withType(ConfigDef.Type.STRING) + .withWidth(ConfigDef.Width.MEDIUM) + .withImportance(ConfigDef.Importance.LOW) + .withValidation(CommonConnectorConfig::validateTopicName) + .withDescription("Overrides the topic.prefix used for the data change topic."); + + public static final Field OVERRIDE_SCHEMA_CHANGE_TOPIC = Field.create("override.schema.change.topic") + .withDisplayName("Override schema change topic name") + .withType(ConfigDef.Type.STRING) + .withWidth(ConfigDef.Width.MEDIUM) + .withImportance(ConfigDef.Importance.LOW) + .withValidation(CommonConnectorConfig::validateTopicName) + .withDescription("Overrides the name of the schema change topic (if not set uses topic.prefx)."); + + private String overrideDataChangeTopicPrefix; + private String overrideSchemaChangeTopic; public TableTopicNamingStrategy(Properties props) { super(props); + } + + @Override + public void configure(Properties props) { + super.configure(props); Configuration config = Configuration.from(props); - this.overrideDataChangeTopicPrefix = config.getString(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX); - this.overrideSchemaChangeTopic = config.getString(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC); + final Field.Set configFields = Field.setOf( + OVERRIDE_DATA_CHANGE_TOPIC_PREFIX, + OVERRIDE_SCHEMA_CHANGE_TOPIC); + if (!config.validateAndRecord(configFields, LOGGER::error)) { + throw new ConnectException("Unable to validate config."); + } + + overrideDataChangeTopicPrefix = config.getString(OVERRIDE_DATA_CHANGE_TOPIC_PREFIX); + overrideSchemaChangeTopic = config.getString(OVERRIDE_SCHEMA_CHANGE_TOPIC); } public static TableTopicNamingStrategy create(CommonConnectorConfig config) { diff --git a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java index 22817fc3..9fae94dc 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java +++ b/src/main/java/io/debezium/connector/vitess/VitessConnectorConfig.java @@ -374,22 +374,6 @@ public static BigIntUnsignedHandlingMode parse(String value, String defaultValue + "'false' (the default) offsets are stored as a single unit under the database name. " + "'true' stores the offsets per task id"); - public static final Field OVERRIDE_DATA_CHANGE_TOPIC_PREFIX = Field.create("override.data.change.topic.prefix") - .withDisplayName("Override Data Topic prefix") - .withType(Type.STRING) - .withWidth(Width.MEDIUM) - .withImportance(ConfigDef.Importance.LOW) - .withValidation(CommonConnectorConfig::validateTopicName) - .withDescription("Overrides the topic.prefix used for the data change topic."); - - public static final Field OVERRIDE_SCHEMA_CHANGE_TOPIC = Field.create("override.schema.change.topic") - .withDisplayName("Override schema change topic name") - .withType(Type.STRING) - .withWidth(Width.MEDIUM) - .withImportance(ConfigDef.Importance.LOW) - .withValidation(CommonConnectorConfig::validateTopicName) - .withDescription("Overrides the name of the schema change topic (if not set uses topic.prefx)."); - public static final Field OFFSET_STORAGE_TASK_KEY_GEN = Field.create(VITESS_CONFIG_GROUP_PREFIX + "offset.storage.task.key.gen") .withDisplayName("Offset storage task key generation number") .withType(Type.INT) @@ -500,8 +484,6 @@ private static int validateTimePrecisionMode(Configuration config, Field field, GRPC_HEADERS, GRPC_MAX_INBOUND_MESSAGE_SIZE, BINARY_HANDLING_MODE, - OVERRIDE_DATA_CHANGE_TOPIC_PREFIX, - OVERRIDE_SCHEMA_CHANGE_TOPIC, SCHEMA_NAME_ADJUSTMENT_MODE, OFFSET_STORAGE_PER_TASK, OFFSET_STORAGE_TASK_KEY_GEN, diff --git a/src/test/java/io/debezium/connector/vitess/TableTopicNamingStrategyTest.java b/src/test/java/io/debezium/connector/vitess/TableTopicNamingStrategyTest.java index 0b02eb5f..cda50ec3 100644 --- a/src/test/java/io/debezium/connector/vitess/TableTopicNamingStrategyTest.java +++ b/src/test/java/io/debezium/connector/vitess/TableTopicNamingStrategyTest.java @@ -7,9 +7,11 @@ package io.debezium.connector.vitess; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; import java.util.Properties; +import org.apache.kafka.connect.errors.ConnectException; import org.junit.Test; import io.debezium.relational.TableId; @@ -34,19 +36,18 @@ public void shouldGetOverrideDataChangeTopic() { final Properties props = new Properties(); props.put("topic.delimiter", "."); props.put("topic.prefix", "prefix"); - props.put(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX.name(), "override-prefix"); + props.put(TableTopicNamingStrategy.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX.name(), "override-prefix"); TopicNamingStrategy strategy = new TableTopicNamingStrategy(props); String topicName = strategy.dataChangeTopic(tableId); assertThat(topicName).isEqualTo("override-prefix.table"); } @Test - public void shouldUseTopicPrefixIfOverrideIsBlank() { + public void shouldUseTopicPrefixIfOverrideIsNotSpecified() { TableId tableId = new TableId("shard", "keyspace", "table"); final Properties props = new Properties(); props.put("topic.delimiter", "."); props.put("topic.prefix", "prefix"); - props.put(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX.name(), ""); TopicNamingStrategy strategy = new TableTopicNamingStrategy(props); String topicName = strategy.dataChangeTopic(tableId); assertThat(topicName).isEqualTo("prefix.table"); @@ -54,24 +55,70 @@ public void shouldUseTopicPrefixIfOverrideIsBlank() { @Test public void shouldGetOverrideSchemaChangeTopic() { - TableId tableId = new TableId("shard", "keyspace", "table"); final Properties props = new Properties(); props.put("topic.prefix", "prefix"); - props.put(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC.name(), "override-prefix"); + props.put(TableTopicNamingStrategy.OVERRIDE_SCHEMA_CHANGE_TOPIC.name(), "override-prefix"); TopicNamingStrategy strategy = new TableTopicNamingStrategy(props); String topicName = strategy.schemaChangeTopic(); assertThat(topicName).isEqualTo("override-prefix"); } @Test - public void shouldUseTopicPrefixIfOverrideSchemaIsBlank() { - TableId tableId = new TableId("shard", "keyspace", "table"); + public void shouldUseTopicPrefixIfOverrideSchemaIsNotSpecified() { final Properties props = new Properties(); props.put("topic.prefix", "prefix"); - props.put(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC.name(), ""); TopicNamingStrategy strategy = new TableTopicNamingStrategy(props); String topicName = strategy.schemaChangeTopic(); assertThat(topicName).isEqualTo("prefix"); } + @Test + public void shouldUseValidationsOfParentClass() { + final Properties props = new Properties(); + props.put("topic.prefix", "invalid?.topic.name"); + assertThatThrownBy(() -> { + new TableTopicNamingStrategy(props); + }).isInstanceOf(ConnectException.class); + } + + @Test + public void shouldThrowExceptionForInvalidOverrideDataChangeTopic() { + final Properties props = new Properties(); + props.put("topic.prefix", "dev.database"); + props.put(TableTopicNamingStrategy.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX.name(), "invalid?.topic.name"); + assertThatThrownBy(() -> { + new TableTopicNamingStrategy(props); + }).isInstanceOf(ConnectException.class); + } + + @Test + public void shouldBlankOverrideTopicPrefixFailValidation() { + final Properties props = new Properties(); + props.put("topic.prefix", "dev.database"); + props.put(TableTopicNamingStrategy.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX.name(), ""); + assertThatThrownBy(() -> { + new TableTopicNamingStrategy(props); + }).isInstanceOf(ConnectException.class); + } + + @Test + public void shouldThrowExceptionForInvalidOverrideSchemaChangeTopic() { + final Properties props = new Properties(); + props.put("topic.prefix", "dev.database"); + props.put(TableTopicNamingStrategy.OVERRIDE_SCHEMA_CHANGE_TOPIC.name(), "invalid?.topic.name"); + assertThatThrownBy(() -> { + new TableTopicNamingStrategy(props); + }).isInstanceOf(ConnectException.class); + } + + @Test + public void shouldBlankOverrideSchemaTopicPrefixFailValidation() { + final Properties props = new Properties(); + props.put("topic.prefix", "dev.database"); + props.put(TableTopicNamingStrategy.OVERRIDE_SCHEMA_CHANGE_TOPIC.name(), ""); + assertThatThrownBy(() -> { + new TableTopicNamingStrategy(props); + }).isInstanceOf(ConnectException.class); + } + } diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java index b9b8d7a5..da928a84 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorConfigTest.java @@ -51,67 +51,6 @@ public void shouldGetVitessHeartbeatNoOp() { assertThat(heartbeat).isEqualTo(Heartbeat.DEFAULT_NOOP_HEARTBEAT); } - @Test - public void shouldImproperOverrideTopicPrefixFailValidation() { - Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX, "hello@world").build(); - VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration); - List inputs = new ArrayList<>(); - Consumer printConsumer = (input) -> { - inputs.add(input); - }; - connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX), printConsumer); - assertThat(inputs.size()).isEqualTo(1); - } - - @Test - public void shouldBlankOverrideTopicPrefixFailValidation() { - Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX, "").build(); - VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration); - List inputs = new ArrayList<>(); - Consumer printConsumer = (input) -> { - inputs.add(input); - }; - connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX), printConsumer); - assertThat(inputs.size()).isEqualTo(1); - } - - @Test - public void shouldImproperOverrideSchemaTopicPrefixFailValidation() { - Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC, "hello@world").build(); - VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration); - List inputs = new ArrayList<>(); - Consumer printConsumer = (input) -> { - inputs.add(input); - }; - connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC), printConsumer); - assertThat(inputs.size()).isEqualTo(1); - } - - @Test - public void shouldUseSchemaTopicPrefix() { - Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC, - "__debezium-ddl.dev.msgdata.precomputed_channel_summary_partitioned").build(); - VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration); - List inputs = new ArrayList<>(); - Consumer printConsumer = (input) -> { - inputs.add(input); - }; - connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC), printConsumer); - assertThat(inputs.size()).isEqualTo(0); - } - - @Test - public void shouldBlankOverrideSchemaTopicPrefixFailValidation() { - Configuration configuration = TestHelper.defaultConfig().with(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC, "").build(); - VitessConnectorConfig connectorConfig = new VitessConnectorConfig(configuration); - List inputs = new ArrayList<>(); - Consumer printConsumer = (input) -> { - inputs.add(input); - }; - connectorConfig.validateAndRecord(List.of(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC), printConsumer); - assertThat(inputs.size()).isEqualTo(1); - } - @Test public void shouldExcludeEmptyShards() { Configuration configuration = TestHelper.defaultConfig().with( diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java index 0403cbea..5a0d9c8f 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java @@ -261,8 +261,8 @@ public void shouldSupportOverrideDataChangeAndSchemaChangeTopics() throws Except String overrideSchemaChangeTopic = "schema.alternate.topic"; startConnector(config -> config .with(VitessConnectorConfig.TOPIC_NAMING_STRATEGY, TableTopicNamingStrategy.class) - .with(VitessConnectorConfig.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX, overrideDataChangeTopicPrefix) - .with(VitessConnectorConfig.OVERRIDE_SCHEMA_CHANGE_TOPIC, overrideSchemaChangeTopic) + .with(TableTopicNamingStrategy.OVERRIDE_DATA_CHANGE_TOPIC_PREFIX, overrideDataChangeTopicPrefix) + .with(TableTopicNamingStrategy.OVERRIDE_SCHEMA_CHANGE_TOPIC, overrideSchemaChangeTopic) .with(VitessConnectorConfig.INCLUDE_SCHEMA_CHANGES, true), false); assertConnectorIsRunning();