Skip to content

Commit

Permalink
DBZ-7698 Fix naming, implement getTransactionInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Mar 30, 2024
1 parent 5a02b29 commit 1d0e9e0
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,11 @@

import org.apache.kafka.connect.data.Struct;

import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionInfo;
import io.debezium.data.Envelope;
import io.debezium.pipeline.source.spi.EventMetadataProvider;
import io.debezium.pipeline.spi.OffsetContext;
import io.debezium.pipeline.txmetadata.TransactionInfo;
import io.debezium.spi.schema.DataCollectionId;
import io.debezium.util.Collect;

Expand Down Expand Up @@ -61,9 +63,15 @@ public String getTransactionId(
return sourceInfo.getString(SourceInfo.VGTID_KEY);
}

// @Override
// public TransactionInfo getTransactionInfo(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
//
// }
@Override
public TransactionInfo getTransactionInfo(DataCollectionId source, OffsetContext offset, Object key, Struct value) {
if (value == null || source == null) {
return null;
}
final Struct sourceInfo = value.getStruct(Envelope.FieldName.SOURCE);
String vgtid = sourceInfo.getString(SourceInfo.VGTID_KEY);
String shard = sourceInfo.getString(SourceInfo.SHARD_KEY);
return new VitessTransactionInfo(vgtid, shard);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;

import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionContext;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionContext;
import io.debezium.data.Envelope;
import io.debezium.pipeline.txmetadata.TransactionStructMaker;
import io.debezium.schema.SchemaFactory;
Expand All @@ -35,8 +35,8 @@ public Schema getOrderedTransactionBlockSchema() {
.field(TransactionStructMaker.DEBEZIUM_TRANSACTION_ID_KEY, Schema.STRING_SCHEMA)
.field(TransactionStructMaker.DEBEZIUM_TRANSACTION_TOTAL_ORDER_KEY, Schema.INT64_SCHEMA)
.field(TransactionStructMaker.DEBEZIUM_TRANSACTION_DATA_COLLECTION_ORDER_KEY, Schema.INT64_SCHEMA)
.field(VitessTransactionContext.OFFSET_TRANSACTION_EPOCH, Schema.INT64_SCHEMA)
.field(VitessTransactionContext.OFFSET_TRANSACTION_RANK, Schema.STRING_SCHEMA)
.field(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, Schema.INT64_SCHEMA)
.field(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK, Schema.STRING_SCHEMA)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ else if (gtid.isHostSetSubsetOf(previousGtid)) {

public Map<String, Object> store(Map<String, Object> offset) {
try {
offset.put(VitessTransactionContext.OFFSET_TRANSACTION_EPOCH, MAPPER.writeValueAsString(shardToEpoch));
offset.put(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, MAPPER.writeValueAsString(shardToEpoch));
return offset;
}
catch (JsonProcessingException e) {
Expand All @@ -52,7 +52,7 @@ public Map<String, Object> store(Map<String, Object> offset) {

public void load(Map<String, ?> offsets) {
try {
String shardToEpochString = (String) offsets.get(VitessTransactionContext.OFFSET_TRANSACTION_EPOCH);
String shardToEpochString = (String) offsets.get(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH);
if (shardToEpochString != null) {
shardToEpoch = MAPPER.readValue(shardToEpochString, new TypeReference<Map<String, Long>>() {
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.pipeline.txmetadata.TransactionInfo;

public class VitessTransactionContext extends TransactionContext {
public class VitessOrderedTransactionContext extends TransactionContext {
public static final String OFFSET_TRANSACTION_EPOCH = "transaction_epoch";
public static final String OFFSET_TRANSACTION_RANK = "transaction_rank";
protected String previousTransactionId = null;
Expand All @@ -21,10 +21,10 @@ public class VitessTransactionContext extends TransactionContext {
private VitessEpochProvider epochProvider = new VitessEpochProvider();
private VitessRankProvider rankProvider = new VitessRankProvider();

public VitessTransactionContext() {
public VitessOrderedTransactionContext() {
}

public VitessTransactionContext(TransactionContext transactionContext) {
public VitessOrderedTransactionContext(TransactionContext transactionContext) {
super();
// Copy fields
this.transactionId = transactionContext.transactionId;
Expand All @@ -38,12 +38,12 @@ public Map<String, Object> store(Map<String, Object> offset) {
return epochProvider.store(offset);
}

public static VitessTransactionContext load(Map<String, ?> offsets) {
public static VitessOrderedTransactionContext load(Map<String, ?> offsets) {
TransactionContext transactionContext = TransactionContext.load(offsets);
VitessTransactionContext vitessTransactionContext = new VitessTransactionContext(transactionContext);
vitessTransactionContext.previousTransactionId = (String) offsets.get(TransactionContext.OFFSET_TRANSACTION_ID);
vitessTransactionContext.epochProvider.load(offsets);
return vitessTransactionContext;
VitessOrderedTransactionContext vitessOrderedTransactionContext = new VitessOrderedTransactionContext(transactionContext);
vitessOrderedTransactionContext.previousTransactionId = (String) offsets.get(TransactionContext.OFFSET_TRANSACTION_ID);
vitessOrderedTransactionContext.epochProvider.load(offsets);
return vitessOrderedTransactionContext;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ public Struct prepareTxStruct(OffsetContext offsetContext, long dataCollectionEv
}

private Struct addOrderMetadata(Struct struct, OffsetContext offsetContext) {
VitessTransactionContext context = getVitessTransactionOrderMetadata(offsetContext);
struct.put(VitessTransactionContext.OFFSET_TRANSACTION_RANK, context.transactionRank.toString());
struct.put(VitessTransactionContext.OFFSET_TRANSACTION_EPOCH, context.transactionEpoch);
VitessOrderedTransactionContext context = getVitessTransactionOrderMetadata(offsetContext);
struct.put(VitessOrderedTransactionContext.OFFSET_TRANSACTION_RANK, context.transactionRank.toString());
struct.put(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, context.transactionEpoch);
return struct;
}

private VitessTransactionContext getVitessTransactionOrderMetadata(OffsetContext offsetContext) {
return (VitessTransactionContext) offsetContext.getTransactionContext();
private VitessOrderedTransactionContext getVitessTransactionOrderMetadata(OffsetContext offsetContext) {
return (VitessOrderedTransactionContext) offsetContext.getTransactionContext();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
import io.debezium.config.Configuration;
import io.debezium.config.Field;
import io.debezium.connector.vitess.connection.VitessReplicationConnection;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionContext;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionStructMaker;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionContext;
import io.debezium.converters.CloudEventsConverterTest;
import io.debezium.converters.spi.CloudEventsMaker;
import io.debezium.data.Envelope;
Expand Down Expand Up @@ -423,7 +423,7 @@ public void shouldProvideOrderedTransactionMetadata() throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl", TEST_SHARDED_KEYSPACE);
TestHelper.applyVSchema("vitess_vschema.json");
startConnector(config -> config
.with(CommonConnectorConfig.TRANSACTION_CONTEXT, VitessTransactionContext.class)
.with(CommonConnectorConfig.TRANSACTION_CONTEXT, VitessOrderedTransactionContext.class)
.with(CommonConnectorConfig.TRANSACTION_STRUCT_MAKER, VitessOrderedTransactionStructMaker.class)
.with(CommonConnectorConfig.PROVIDE_TRANSACTION_METADATA, true)
.with(VitessConnectorConfig.SHARD, "-80,80-"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import org.junit.Before;
import org.junit.Test;

import io.debezium.connector.vitess.pipeline.txmetadata.VitessTransactionContext;
import io.debezium.connector.vitess.pipeline.txmetadata.VitessOrderedTransactionContext;
import io.debezium.pipeline.txmetadata.TransactionContext;
import io.debezium.util.Clock;
import io.debezium.util.Collect;
Expand Down Expand Up @@ -100,26 +100,26 @@ public void shouldResetToNewVGgtid() {
}

@Test
public void shouldGetVitessTransactionContext() {
public void shouldGetVitessOrderedTransactionContext() {
VitessConnectorConfig config = new VitessConnectorConfig(
TestHelper.defaultConfig()
.with(VitessConnectorConfig.TRANSACTION_CONTEXT, VitessTransactionContext.class)
.with(VitessConnectorConfig.TRANSACTION_CONTEXT, VitessOrderedTransactionContext.class)
.build());
VitessOffsetContext.Loader loader = new VitessOffsetContext.Loader(config);
Map offsets = Map.of(SourceInfo.VGTID_KEY, VGTID_JSON);
VitessOffsetContext context = loader.load(offsets);
TransactionContext transactionContext = context.getTransactionContext();
assertThat(transactionContext).isInstanceOf(VitessTransactionContext.class);
assertThat(transactionContext).isInstanceOf(VitessOrderedTransactionContext.class);
}

@Test
public void shouldGetInitialVitessTransactionContext() {
public void shouldGetInitialVitessOrderedTransactionContext() {
VitessConnectorConfig config = new VitessConnectorConfig(
TestHelper.defaultConfig()
.with(VitessConnectorConfig.TRANSACTION_CONTEXT, VitessTransactionContext.class)
.with(VitessConnectorConfig.TRANSACTION_CONTEXT, VitessOrderedTransactionContext.class)
.build());
VitessOffsetContext context = VitessOffsetContext.initialContext(config, Clock.system());
TransactionContext transactionContext = context.getTransactionContext();
assertThat(transactionContext).isInstanceOf(VitessTransactionContext.class);
assertThat(transactionContext).isInstanceOf(VitessOrderedTransactionContext.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,22 @@

import io.debezium.connector.vitess.SourceInfo;

public class VitessTransactionContextTest {
public class VitessOrderedTransactionContextTest {

private static final Schema sourceStructSchema = SchemaBuilder.struct().field(SourceInfo.VGTID_KEY, Schema.STRING_SCHEMA);

@Test
public void shouldInit() {
new VitessTransactionContext();
new VitessOrderedTransactionContext();
}

@Test
public void shouldLoad() {
String expectedId = null;
String expectedEpoch = "{\"-80\": 0}";
Map offsets = Map.of(
VitessTransactionContext.OFFSET_TRANSACTION_EPOCH, expectedEpoch);
VitessTransactionContext metadata = new VitessTransactionContext();
VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH, expectedEpoch);
VitessOrderedTransactionContext metadata = new VitessOrderedTransactionContext();
metadata.load(offsets);
assertThat(metadata.previousTransactionId).isEqualTo(expectedId);
}
Expand All @@ -43,15 +43,15 @@ public void shouldLoadWithNull() {
String expectedId = null;
Long expectedEpoch = 0L;
Map offsets = Collections.emptyMap();
VitessTransactionContext metadata = new VitessTransactionContext();
VitessOrderedTransactionContext metadata = new VitessOrderedTransactionContext();
metadata.load(offsets);
assertThat(metadata.previousTransactionId).isEqualTo(expectedId);
assertThat(metadata.transactionEpoch).isEqualTo(expectedEpoch);
}

@Test
public void shouldUpdateEpoch() {
VitessTransactionContext metadata = new VitessTransactionContext();
VitessOrderedTransactionContext metadata = new VitessOrderedTransactionContext();

String expectedTxId = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3,host2:3-4\", \"shard\": \"-80\"}]";
BigInteger expectedRank = new BigInteger("7");
Expand All @@ -75,7 +75,7 @@ public void shouldUpdateEpoch() {

@Test
public void shouldUpdateRank() {
VitessTransactionContext metadata = new VitessTransactionContext();
VitessOrderedTransactionContext metadata = new VitessOrderedTransactionContext();

String expectedTxId = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3,host2:3-4\", \"shard\": \"-80\"}]";
String expectedShard = "-80";
Expand All @@ -92,7 +92,7 @@ public void shouldUpdateRank() {

@Test
public void shouldStoreOffsets() {
VitessTransactionContext metadata = new VitessTransactionContext();
VitessOrderedTransactionContext metadata = new VitessOrderedTransactionContext();

String expectedTxId = "[{\"keyspace\": \"foo\", \"gtid\": \"host1:1-3,host2:3-4\", \"shard\": \"-80\"}]";
String expectedShard = "-80";
Expand All @@ -103,6 +103,6 @@ public void shouldStoreOffsets() {
Map offsets = new HashMap();
String expectedEpoch = "{\"-80\":0}";
Map actualOffsets = metadata.store(offsets);
assertThat(actualOffsets.get(VitessTransactionContext.OFFSET_TRANSACTION_EPOCH)).isEqualTo(expectedEpoch);
assertThat(actualOffsets.get(VitessOrderedTransactionContext.OFFSET_TRANSACTION_EPOCH)).isEqualTo(expectedEpoch);
}
}

0 comments on commit 1d0e9e0

Please sign in to comment.