Skip to content

Commit

Permalink
DBZ-7962 Add heartbeats
Browse files Browse the repository at this point in the history
  • Loading branch information
twthorn committed Jun 18, 2024
1 parent de62d2e commit aca972a
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ else if (message.getOperation() == ReplicationMessage.Operation.DDL || message.g
// DDL event or OTHER event
offsetContext.rotateVgtid(newVgtid, message.getCommitTime());
}
else if (message.getOperation().equals(ReplicationMessage.Operation.HEARTBEAT)) {
dispatcher.dispatchHeartbeatEvent(partition, offsetContext);
}
else {
// DML event
TableId tableId = VitessDatabaseSchema.parse(message.getTable());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.debezium.connector.vitess.connection;

import java.time.Instant;
import java.util.List;

public class HeartbeatMessage implements ReplicationMessage {

private final Instant commitTime;
private final Operation operation;

public HeartbeatMessage(Instant commitTime) {
this.commitTime = commitTime;
this.operation = Operation.HEARTBEAT;
}

@Override
public Operation getOperation() {
return Operation.HEARTBEAT;
}

@Override
public Instant getCommitTime() {
return commitTime;
}

@Override
public String getTransactionId() {
throw new UnsupportedOperationException();
}

@Override
public String getTable() {
throw new UnsupportedOperationException();
}

@Override
public String getShard() {
throw new UnsupportedOperationException();
}

@Override
public List<Column> getOldTupleList() {
throw new UnsupportedOperationException();
}

@Override
public List<Column> getNewTupleList() {
throw new UnsupportedOperationException();
}

@Override
public String toString() {
return "HeartbeatMessage{"
+ "commitTime="
+ commitTime
+ ", operation="
+ operation
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ enum Operation {
BEGIN,
COMMIT,
DDL,
OTHER
OTHER,
HEARTBEAT
}

/** A representation of column value delivered as a part of replication message */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public void processMessage(Binlogdata.VEvent vEvent,
case OTHER:
handleOther(vEvent, processor, newVgtid);
break;
case HEARTBEAT:
handleHeartbeat(vEvent, processor, newVgtid);
case VGTID:
case VERSION:
break;
Expand Down Expand Up @@ -111,6 +113,11 @@ private void handleOther(Binlogdata.VEvent vEvent, ReplicationMessageProcessor p
new OtherMessage(transactionId, eventTimestamp), newVgtid, false);
}

private void handleHeartbeat(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid) throws InterruptedException {
Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
processor.process(new HeartbeatMessage(eventTimestamp), newVgtid, false);
}

private void handleBeginMessage(Binlogdata.VEvent vEvent, ReplicationMessageProcessor processor, Vgtid newVgtid)
throws InterruptedException {
Instant eventTimestamp = Instant.ofEpochSecond(vEvent.getTimestamp());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public void onNext(Vtgate.VStreamResponse response) {
LOGGER.debug("Received {} VEvents in the VStreamResponse:",
response.getEventsCount());
boolean sendNow = false;
boolean heartbeatReceived = false;
for (VEvent event : response.getEventsList()) {
LOGGER.debug("VEvent: {}", event);
switch (event.getType()) {
Expand Down Expand Up @@ -179,6 +180,11 @@ public void onNext(Vtgate.VStreamResponse response) {
// [VGTID, OTHER]. This is the first response if "current" is used as starting gtid.
sendNow = true;
break;
case HEARTBEAT:
heartbeatReceived = true;
// Mark sendNow true since begin/commit events may not have been received for just heartbeat events
sendNow = true;
break;
}
bufferedEvents.add(event);
}
Expand All @@ -194,7 +200,8 @@ public void onNext(Vtgate.VStreamResponse response) {
if (numResponses > 1) {
LOGGER.debug("Processing multi-response transaction: number of responses is {}", numResponses);
}
if (newVgtid == null) {
// If there is a heartbeat event we do not want to skip (we want to send the heartbeat)
if (newVgtid == null && !heartbeatReceived) {
LOGGER.warn("Skipping because no vgtid is found in buffered event types: {}",
bufferedEvents.stream().map(VEvent::getType).map(Objects::toString).collect(Collectors.joining(", ")));
reset();
Expand Down Expand Up @@ -264,6 +271,7 @@ private void setError(String msg) {

Vtgate.VStreamFlags vStreamFlags = Vtgate.VStreamFlags.newBuilder()
.setStopOnReshard(config.getStopOnReshard())
.setHeartbeatInterval(getHeartbeatSeconds())
.build();
// Add filtering for whitelist tables
Binlogdata.Filter.Builder filterBuilder = Binlogdata.Filter.newBuilder();
Expand All @@ -290,10 +298,22 @@ private void setError(String msg) {
if (filterBuilder.getRulesCount() > 0) {
vstreamBuilder.setFilter(filterBuilder);
}
Vtgate.VStreamRequest request = vstreamBuilder.build();
stub.vStream(
vstreamBuilder.build(),
request,
responseObserver);
LOGGER.info("Started VStream");
LOGGER.info("Started VStream {}", request);
}

private int getHeartbeatSeconds() {
long secondsLong = config.getHeartbeatInterval().toSeconds();
if (secondsLong > Integer.MAX_VALUE) {
LOGGER.warn("Heartbeat interval {} seconds exceeds the maximum value of integer, using max value", secondsLong);
return Integer.MAX_VALUE;
}
else {
return (int) secondsLong;
}
}

private VitessGrpc.VitessStub newStub(ManagedChannel channel) {
Expand Down
20 changes: 20 additions & 0 deletions src/test/java/io/debezium/connector/vitess/VitessConnectorIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,9 @@
import io.debezium.data.Envelope;
import io.debezium.data.VerifyRecord;
import io.debezium.doc.FixFor;
import io.debezium.embedded.AbstractConnectorTest;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.jdbc.TemporalPrecisionMode;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.TableId;
Expand Down Expand Up @@ -148,6 +150,24 @@ public void shouldReceiveChangesForInsertsWithDifferentDataTypes() throws Except
assertInsert(INSERT_SET_TYPE_STMT, schemasAndValuesForSetType(), TestHelper.PK_FIELD);
}

@Test
@FixFor("DBZ-2776")
public void shouldReceiveHeartbeatEvents() throws Exception {
TestHelper.executeDDL("vitess_create_tables.ddl");
startConnector(config -> config.with(
Heartbeat.HEARTBEAT_INTERVAL.name(), 1000),
true);
assertConnectorIsRunning();

String topic = Heartbeat.HEARTBEAT_TOPICS_PREFIX.defaultValueAsString() + "." + TEST_SERVER;
int expectedHeartbeatRecords = 1;
// Sleep for 3 seconds, heartbeat sent every 1 second
Thread.sleep(3 * 1000);

AbstractConnectorTest.SourceRecords records = consumeRecordsByTopic(expectedHeartbeatRecords, 1);
assertThat(records.recordsForTopic(topic).size()).isEqualTo(expectedHeartbeatRecords);
}

@Test
@FixFor("DBZ-2776")
public void shouldReceiveChangesForInsertsWithEnum() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.debezium.connector.vitess.connection.ReplicationMessage;
import io.debezium.connector.vitess.connection.VitessReplicationConnection;
import io.debezium.doc.FixFor;
import io.debezium.heartbeat.Heartbeat;
import io.debezium.junit.logging.LogInterceptor;
import io.debezium.relational.RelationalDatabaseConnectorConfig;
import io.debezium.schema.DefaultTopicNamingStrategy;
Expand Down Expand Up @@ -294,6 +295,69 @@ public void shouldHaveVgtidInResponse() throws Exception {
}
}

@Test
public void shouldSendHeartbeatMessage() throws Exception {
// setup fixture
final VitessConnectorConfig conf = new VitessConnectorConfig(TestHelper.defaultConfig().with(
Heartbeat.HEARTBEAT_INTERVAL, 1000).build());
final VitessDatabaseSchema vitessDatabaseSchema = new VitessDatabaseSchema(
conf, SchemaNameAdjuster.create(), (TopicNamingStrategy) DefaultTopicNamingStrategy.create(conf));

AtomicReference<Throwable> error = new AtomicReference<>();
try (VitessReplicationConnection connection = new VitessReplicationConnection(conf, vitessDatabaseSchema)) {
Vgtid startingVgtid = Vgtid.of(
Binlogdata.VGtid.newBuilder()
.addShardGtids(
Binlogdata.ShardGtid.newBuilder()
.setKeyspace(conf.getKeyspace())
.setShard(conf.getShard().get(0))
.setGtid(Vgtid.CURRENT_GTID)
.build())
.build());

BlockingQueue<MessageAndVgtid> consumedMessages = new ArrayBlockingQueue<>(100);
AtomicBoolean started = new AtomicBoolean(false);
connection.startStreaming(
startingVgtid,
(message, vgtid, isLastRowEventOfTransaction) -> {
if (!started.get()) {
started.set(true);
}
consumedMessages.add(new MessageAndVgtid(message, vgtid));
},
error);
// Since we are using the "current" as the starting position, there is a race here
// if we execute INSERT_STMT before the vstream starts we will never receive the update
// therefore, we wait until the stream is setup and then do the insertion
Awaitility
.await()
.atMost(Duration.ofSeconds(TestHelper.waitTimeForRecords()))
.until(started::get);
consumedMessages.clear();
int expectedNumOfMessages = 1;
List<MessageAndVgtid> messages = awaitMessages(
TestHelper.waitTimeForRecords(),
SECONDS,
expectedNumOfMessages,
() -> {
try {
return consumedMessages.poll(pollTimeoutInMs, TimeUnit.MILLISECONDS);
}
catch (InterruptedException e) {
return null;
}
});

// verify outcome
assertThat(messages.get(0).getMessage().getOperation().name()).isEqualTo("HEARTBEAT");
}
finally {
if (error.get() != null) {
LOGGER.error("Error during streaming", error.get());
}
}
}

@Test
public void shouldSendCommitTimestamp() throws Exception {
// setup fixture
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Copyright Debezium Authors.
*
* Licensed under the Apache Software License version 2.0, available at http://www.apache.org/licenses/LICENSE-2.0
*/

package io.debezium.connector.vitess.connection;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;

import java.time.Instant;

import org.junit.Test;

public class HeartbeatMessageTest {

@Test
public void getOperation() {
ReplicationMessage message = new HeartbeatMessage(Instant.EPOCH);
assertThat(message.getOperation()).isEqualTo(ReplicationMessage.Operation.HEARTBEAT);
}

@Test
public void getCommitTime() {
ReplicationMessage message = new HeartbeatMessage(Instant.EPOCH);
assertThat(message.getCommitTime()).isEqualTo(Instant.EPOCH);
}

@Test
public void getTransactionId() {
ReplicationMessage message = new HeartbeatMessage(Instant.EPOCH);
assertThatThrownBy(() -> message.getTransactionId())
.isInstanceOf(UnsupportedOperationException.class);
}

@Test
public void getTable() {
ReplicationMessage message = new HeartbeatMessage(Instant.EPOCH);
assertThatThrownBy(() -> message.getTable())
.isInstanceOf(UnsupportedOperationException.class);
}

@Test
public void getShard() {
ReplicationMessage message = new HeartbeatMessage(Instant.EPOCH);
assertThatThrownBy(() -> message.getShard())
.isInstanceOf(UnsupportedOperationException.class);
}

@Test
public void getOldTupleList() {
ReplicationMessage message = new HeartbeatMessage(Instant.EPOCH);
assertThatThrownBy(() -> message.getOldTupleList())
.isInstanceOf(UnsupportedOperationException.class);
}

@Test
public void getNewTupleList() {
ReplicationMessage message = new HeartbeatMessage(Instant.EPOCH);
assertThatThrownBy(() -> message.getNewTupleList())
.isInstanceOf(UnsupportedOperationException.class);
}

@Test
public void testToString() {
ReplicationMessage message = new HeartbeatMessage(Instant.EPOCH);
assertThat(message.toString()).isEqualTo("HeartbeatMessage{commitTime=1970-01-01T00:00:00Z, operation=HEARTBEAT}");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,34 @@ public void shouldProcessBeginEvent() throws Exception {
assertThat(processed[0]).isTrue();
}

@Test
public void shouldProcessHeartbeatEvent() throws Exception {
// setup fixture
String expectedShard = "shard";
Binlogdata.VEvent event = Binlogdata.VEvent.newBuilder()
.setType(Binlogdata.VEventType.HEARTBEAT)
.setShard(expectedShard)
.setTimestamp(AnonymousValue.getLong())
.build();
Vgtid newVgtid = Vgtid.of(VgtidTest.VGTID_JSON);

// exercise SUT
final boolean[] processed = { false };
decoder.processMessage(
event,
(message, vgtid, isLastRowEventOfTransaction) -> {
// verify outcome
assertThat(message).isNotNull();
assertThat(message).isInstanceOf(HeartbeatMessage.class);
assertThat(message.getOperation()).isEqualTo(ReplicationMessage.Operation.HEARTBEAT);
processed[0] = true;
},
newVgtid,
false,
false);
assertThat(processed[0]).isTrue();
}

@Test
@FixFor("DBZ-4667")
public void shouldNotProcessBeginEventIfNoVgtid() throws Exception {
Expand Down

0 comments on commit aca972a

Please sign in to comment.