From 1d0e9e0a0d665e082ee4a425fb6d366c3175e569 Mon Sep 17 00:00:00 2001 From: twthorn Date: Fri, 29 Mar 2024 19:07:48 -0500 Subject: [PATCH] DBZ-7698 Fix naming, implement getTransactionInfo --- .../vitess/VitessEventMetadataProvider.java | 16 ++++++++++++---- .../connector/vitess/VitessSchemaFactory.java | 6 +++--- .../txmetadata/VitessEpochProvider.java | 4 ++-- ...va => VitessOrderedTransactionContext.java} | 16 ++++++++-------- .../VitessOrderedTransactionStructMaker.java | 10 +++++----- .../connector/vitess/VitessConnectorIT.java | 4 ++-- .../vitess/VitessOffsetContextTest.java | 14 +++++++------- ...> VitessOrderedTransactionContextTest.java} | 18 +++++++++--------- 8 files changed, 48 insertions(+), 40 deletions(-) rename src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/{VitessTransactionContext.java => VitessOrderedTransactionContext.java} (76%) rename src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/{VitessTransactionContextTest.java => VitessOrderedTransactionContextTest.java} (82%) diff --git a/src/main/java/io/debezium/connector/vitess/VitessEventMetadataProvider.java b/src/main/java/io/debezium/connector/vitess/VitessEventMetadataProvider.java index 6f996551..ee7b96b4 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessEventMetadataProvider.java +++ b/src/main/java/io/debezium/connector/vitess/VitessEventMetadataProvider.java @@ -10,9 +10,11 @@ import org.apache.kafka.connect.data.Struct; +import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionInfo; import io.debezium.data.Envelope; import io.debezium.pipeline.source.spi.EventMetadataProvider; import io.debezium.pipeline.spi.OffsetContext; +import io.debezium.pipeline.txmetadata.TransactionInfo; import io.debezium.spi.schema.DataCollectionId; import io.debezium.util.Collect; @@ -61,9 +63,15 @@ public String getTransactionId( return sourceInfo.getString(SourceInfo.VGTID_KEY); } - // @Override - // public TransactionInfo getTransactionInfo(DataCollectionId source, OffsetContext offset, Object key, Struct value) { - // - // } + @Override + public TransactionInfo getTransactionInfo(DataCollectionId source, OffsetContext offset, Object key, Struct value) { + if (value == null || source == null) { + return null; + } + final Struct sourceInfo = value.getStruct(Envelope.FieldName.SOURCE); + String vgtid = sourceInfo.getString(SourceInfo.VGTID_KEY); + String shard = sourceInfo.getString(SourceInfo.SHARD_KEY); + return new VitessTransactionInfo(vgtid, shard); + } } diff --git a/src/main/java/io/debezium/connector/vitess/VitessSchemaFactory.java b/src/main/java/io/debezium/connector/vitess/VitessSchemaFactory.java index c2bceb18..92b9f6c6 100644 --- a/src/main/java/io/debezium/connector/vitess/VitessSchemaFactory.java +++ b/src/main/java/io/debezium/connector/vitess/VitessSchemaFactory.java @@ -11,7 +11,7 @@ import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.data.SchemaBuilder; -import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionContext; +import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionContext; import io.debezium.data.Envelope; import io.debezium.pipeline.txmetadata.TransactionStructMaker; import io.debezium.schema.SchemaFactory; @@ -35,8 +35,8 @@ public Schema getOrderedTransactionBlockSchema() { .field(TransactionStructMaker.DEBEZIUM_TRANSACTION_ID_KEY, Schema.STRING_SCHEMA) .field(TransactionStructMaker.DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY, Schema.INT64_SCHEMA) .field(TransactionStructMaker.DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY, Schema.INT64_SCHEMA) - .field(VitessTransactionContext.OFFSET_TRANSACTION_EPOCH, Schema.INT64_SCHEMA) - .field(VitessTransactionContext.OFFSET_TRANSACTION_RANK, Schema.STRING_SCHEMA) + .field(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, Schema.INT64_SCHEMA) + .field(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK, Schema.STRING_SCHEMA) .build(); } diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java index b7d27103..ac2e315f 100644 --- a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessEpochProvider.java @@ -42,7 +42,7 @@ else if (gtid.isHostSetSubsetOf(previousGtid)) { public Map store(Map offset) { try { - offset.put(VitessTransactionContext.OFFSET_TRANSACTION_EPOCH, MAPPER.writeValueAsString(shardToEpoch)); + offset.put(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, MAPPER.writeValueAsString(shardToEpoch)); return offset; } catch (JsonProcessingException e) { @@ -52,7 +52,7 @@ public Map store(Map offset) { public void load(Map offsets) { try { - String shardToEpochString = (String) offsets.get(VitessTransactionContext.OFFSET_TRANSACTION_EPOCH); + String shardToEpochString = (String) offsets.get(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH); if (shardToEpochString != null) { shardToEpoch = MAPPER.readValue(shardToEpochString, new TypeReference>() { }); diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionContext.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContext.java similarity index 76% rename from src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionContext.java rename to src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContext.java index 2cced919..f7a7abcc 100644 --- a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionContext.java +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContext.java @@ -12,7 +12,7 @@ import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.pipeline.txmetadata.TransactionInfo; -public class VitessTransactionContext extends TransactionContext { +public class VitessOrderedTransactionContext extends TransactionContext { public static final String OFFSET_TRANSACTION_EPOCH = "transaction_epoch"; public static final String OFFSET_TRANSACTION_RANK = "transaction_rank"; protected String previousTransactionId = null; @@ -21,10 +21,10 @@ public class VitessTransactionContext extends TransactionContext { private VitessEpochProvider epochProvider = new VitessEpochProvider(); private VitessRankProvider rankProvider = new VitessRankProvider(); - public VitessTransactionContext() { + public VitessOrderedTransactionContext() { } - public VitessTransactionContext(TransactionContext transactionContext) { + public VitessOrderedTransactionContext(TransactionContext transactionContext) { super(); // Copy fields this.transactionId = transactionContext.transactionId; @@ -38,12 +38,12 @@ public Map store(Map offset) { return epochProvider.store(offset); } - public static VitessTransactionContext load(Map offsets) { + public static VitessOrderedTransactionContext load(Map offsets) { TransactionContext transactionContext = TransactionContext.load(offsets); - VitessTransactionContext vitessTransactionContext = new VitessTransactionContext(transactionContext); - vitessTransactionContext.previousTransactionId = (String) offsets.get(TransactionContext.OFFSET_TRANSACTION_ID); - vitessTransactionContext.epochProvider.load(offsets); - return vitessTransactionContext; + VitessOrderedTransactionContext vitessOrderedTransactionContext = new VitessOrderedTransactionContext(transactionContext); + vitessOrderedTransactionContext.previousTransactionId = (String) offsets.get(TransactionContext.OFFSET_TRANSACTION_ID); + vitessOrderedTransactionContext.epochProvider.load(offsets); + return vitessOrderedTransactionContext; } @Override diff --git a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionStructMaker.java b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionStructMaker.java index c10e7d33..eea58fa1 100644 --- a/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionStructMaker.java +++ b/src/main/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionStructMaker.java @@ -22,14 +22,14 @@ public Struct prepareTxStruct(OffsetContext offsetContext, long dataCollectionEv } private Struct addOrderMetadata(Struct struct, OffsetContext offsetContext) { - VitessTransactionContext context = getVitessTransactionOrderMetadata(offsetContext); - struct.put(VitessTransactionContext.OFFSET_TRANSACTION_RANK, context.transactionRank.toString()); - struct.put(VitessTransactionContext.OFFSET_TRANSACTION_EPOCH, context.transactionEpoch); + VitessOrderedTransactionContext context = getVitessTransactionOrderMetadata(offsetContext); + struct.put(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK, context.transactionRank.toString()); + struct.put(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, context.transactionEpoch); return struct; } - private VitessTransactionContext getVitessTransactionOrderMetadata(OffsetContext offsetContext) { - return (VitessTransactionContext) offsetContext.getTransactionContext(); + private VitessOrderedTransactionContext getVitessTransactionOrderMetadata(OffsetContext offsetContext) { + return (VitessOrderedTransactionContext) offsetContext.getTransactionContext(); } @Override diff --git a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java index b184bc7b..0126ca0f 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java +++ b/src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java @@ -46,8 +46,8 @@ import io.debezium.config.Configuration; import io.debezium.config.Field; import io.debezium.connector.vitess.connection.VitessReplicationConnection; +import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionContext; import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionStructMaker; -import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionContext; import io.debezium.converters.CloudEventsConverterTest; import io.debezium.converters.spi.CloudEventsMaker; import io.debezium.data.Envelope; @@ -423,7 +423,7 @@ public void shouldProvideOrderedTransactionMetadata() throws Exception { TestHelper.executeDDL("vitess_create_tables.ddl", TEST_SHARDED_KEYSPACE); TestHelper.applyVSchema("vitess_vschema.json"); startConnector(config -> config - .with(CommonConnectorConfig.TRANSACTION_CONTEXT, VitessTransactionContext.class) + .with(CommonConnectorConfig.TRANSACTION_CONTEXT, VitessOrderedTransactionContext.class) .with(CommonConnectorConfig.TRANSACTION_STRUCT_MAKER, VitessOrderedTransactionStructMaker.class) .with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true) .with(VitessConnectorConfig.SHARD, "-80,80-"), diff --git a/src/test/java/io/debezium/connector/vitess/VitessOffsetContextTest.java b/src/test/java/io/debezium/connector/vitess/VitessOffsetContextTest.java index 8b390da1..ca4c9033 100644 --- a/src/test/java/io/debezium/connector/vitess/VitessOffsetContextTest.java +++ b/src/test/java/io/debezium/connector/vitess/VitessOffsetContextTest.java @@ -13,7 +13,7 @@ import org.junit.Before; import org.junit.Test; -import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionContext; +import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionContext; import io.debezium.pipeline.txmetadata.TransactionContext; import io.debezium.util.Clock; import io.debezium.util.Collect; @@ -100,26 +100,26 @@ public void shouldResetToNewVGgtid() { } @Test - public void shouldGetVitessTransactionContext() { + public void shouldGetVitessOrderedTransactionContext() { VitessConnectorConfig config = new VitessConnectorConfig( TestHelper.defaultConfig() - .with(VitessConnectorConfig.TRANSACTION_CONTEXT, VitessTransactionContext.class) + .with(VitessConnectorConfig.TRANSACTION_CONTEXT, VitessOrderedTransactionContext.class) .build()); VitessOffsetContext.Loader loader = new VitessOffsetContext.Loader(config); Map offsets = Map.of(SourceInfo.VGTID_KEY, VGTID_JSON); VitessOffsetContext context = loader.load(offsets); TransactionContext transactionContext = context.getTransactionContext(); - assertThat(transactionContext).isInstanceOf(VitessTransactionContext.class); + assertThat(transactionContext).isInstanceOf(VitessOrderedTransactionContext.class); } @Test - public void shouldGetInitialVitessTransactionContext() { + public void shouldGetInitialVitessOrderedTransactionContext() { VitessConnectorConfig config = new VitessConnectorConfig( TestHelper.defaultConfig() - .with(VitessConnectorConfig.TRANSACTION_CONTEXT, VitessTransactionContext.class) + .with(VitessConnectorConfig.TRANSACTION_CONTEXT, VitessOrderedTransactionContext.class) .build()); VitessOffsetContext context = VitessOffsetContext.initialContext(config, Clock.system()); TransactionContext transactionContext = context.getTransactionContext(); - assertThat(transactionContext).isInstanceOf(VitessTransactionContext.class); + assertThat(transactionContext).isInstanceOf(VitessOrderedTransactionContext.class); } } diff --git a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionContextTest.java b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContextTest.java similarity index 82% rename from src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionContextTest.java rename to src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContextTest.java index 69977ad5..9685fa69 100644 --- a/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessTransactionContextTest.java +++ b/src/test/java/io/debezium/connector/vitess/pipeline/txmetadata/VitessOrderedTransactionContextTest.java @@ -18,13 +18,13 @@ import io.debezium.connector.vitess.SourceInfo; -public class VitessTransactionContextTest { +public class VitessOrderedTransactionContextTest { private static final Schema sourceStructSchema = SchemaBuilder.struct().field(SourceInfo.VGTID_KEY, Schema.STRING_SCHEMA); @Test public void shouldInit() { - new VitessTransactionContext(); + new VitessOrderedTransactionContext(); } @Test @@ -32,8 +32,8 @@ public void shouldLoad() { String expectedId = null; String expectedEpoch = "{\"-80\": 0}"; Map offsets = Map.of( - VitessTransactionContext.OFFSET_TRANSACTION_EPOCH, expectedEpoch); - VitessTransactionContext metadata = new VitessTransactionContext(); + VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, expectedEpoch); + VitessOrderedTransactionContext metadata = new VitessOrderedTransactionContext(); metadata.load(offsets); assertThat(metadata.previousTransactionId).isEqualTo(expectedId); } @@ -43,7 +43,7 @@ public void shouldLoadWithNull() { String expectedId = null; Long expectedEpoch = 0L; Map offsets = Collections.emptyMap(); - VitessTransactionContext metadata = new VitessTransactionContext(); + VitessOrderedTransactionContext metadata = new VitessOrderedTransactionContext(); metadata.load(offsets); assertThat(metadata.previousTransactionId).isEqualTo(expectedId); assertThat(metadata.transactionEpoch).isEqualTo(expectedEpoch); @@ -51,7 +51,7 @@ public void shouldLoadWithNull() { @Test public void shouldUpdateEpoch() { - VitessTransactionContext metadata = new VitessTransactionContext(); + VitessOrderedTransactionContext metadata = new VitessOrderedTransactionContext(); String expectedTxId = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3,host2:3-4\", \"shard\": \"-80\"}]"; BigInteger expectedRank = new BigInteger("7"); @@ -75,7 +75,7 @@ public void shouldUpdateEpoch() { @Test public void shouldUpdateRank() { - VitessTransactionContext metadata = new VitessTransactionContext(); + VitessOrderedTransactionContext metadata = new VitessOrderedTransactionContext(); String expectedTxId = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3,host2:3-4\", \"shard\": \"-80\"}]"; String expectedShard = "-80"; @@ -92,7 +92,7 @@ public void shouldUpdateRank() { @Test public void shouldStoreOffsets() { - VitessTransactionContext metadata = new VitessTransactionContext(); + VitessOrderedTransactionContext metadata = new VitessOrderedTransactionContext(); String expectedTxId = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3,host2:3-4\", \"shard\": \"-80\"}]"; String expectedShard = "-80"; @@ -103,6 +103,6 @@ public void shouldStoreOffsets() { Map offsets = new HashMap(); String expectedEpoch = "{\"-80\":0}"; Map actualOffsets = metadata.store(offsets); - assertThat(actualOffsets.get(VitessTransactionContext.OFFSET_TRANSACTION_EPOCH)).isEqualTo(expectedEpoch); + assertThat(actualOffsets.get(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH)).isEqualTo(expectedEpoch); } }