Skip to content

Commit

Permalink
[#15718][CDCSDK] Tests to validate safe time update from explicit che…
Browse files Browse the repository at this point in the history
…ckpoint

Summary:
The diff https://phorge.dev.yugabyte.com/D32455 introduced a fix for data loss issue seen
in the QA test [[ http://stress.dev.yugabyte.com/stress_test/98e14778-d944-4548-abbf-b586d33c18cd | test_cdc_lru_nemesis_splitting_postgres_debezium ]] , by fixing the buggy updation of `cdc_sdk_safe_time` in the state
table in case of explicit checkpointing.
This diff adds tests to validate the fix.

Test Plan:
./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestSafetimeUpdateFromExplicitCheckPoint
./yb_build.sh --cxx-test integration-tests_cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestNoUpdateSafeTimeWithoutSnapshotTime

Reviewers: asrinivasan, skumar, vkushwaha

Reviewed By: asrinivasan

Subscribers: ycdcxcluster

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D32559
  • Loading branch information
Sumukh-Phalgaonkar committed Feb 27, 2024
1 parent 49725c1 commit 1d4ca2b
Showing 1 changed file with 118 additions and 0 deletions.
118 changes: 118 additions & 0 deletions src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7719,5 +7719,123 @@ TEST_F(CDCSDKYsqlTest, TestPgPublicationDisabled) {
ASSERT_OK(conn.Execute("create publication pub2 for all tables;"));
}

// This test validates that the checkpoint (both OpId as well as cdc_sdk_safe_time) is not moved
// ahead incorrectly beyond what the client has explicitly acknowledged
TEST_F(CDCSDKYsqlTest, TestSafetimeUpdateFromExplicitCheckPoint) {
ASSERT_OK(SetUpWithParams(1, 1, false));
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_state_checkpoint_update_interval_ms) = 0;
auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName));
TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName));

google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, nullptr));
ASSERT_EQ(tablets.size(), 1);

xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStreamWithReplicationSlot());
auto set_resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id, tablets, OpId::Min()));
ASSERT_FALSE(set_resp.has_error());

// 0=DDL, 1=INSERT, 2=UPDATE, 3=DELETE, 4=READ, 5=TRUNCATE, 6=BEGIN, 7=COMMIT
const int expected_count[] = {2, 15, 0, 0, 0, 0, 3, 3};
int count[] = {0, 0, 0, 0, 0, 0, 0, 0};

ASSERT_OK(WriteRowsHelper(0, 5, &test_cluster_, true));

// First GetChanges call to consume the first transaction
auto get_changes_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets));
for(auto record : get_changes_resp.cdc_sdk_proto_records()) {
UpdateRecordCount(record, count);
}

ASSERT_OK(WriteRowsHelper(5, 10, &test_cluster_, true));

// Second GetChanges call with from_op_id and explicit checkpoint same as first GetChanges
// response checkpoint. This simulates the situation where Kafka has acknowledged all the records
// in the first transaction
auto explicit_checkpoint = get_changes_resp.cdc_sdk_checkpoint();
explicit_checkpoint.set_snapshot_time(get_changes_resp.safe_hybrid_time());
get_changes_resp = ASSERT_RESULT(GetChangesFromCDCWithExplictCheckpoint(
stream_id, tablets, &get_changes_resp.cdc_sdk_checkpoint(), &explicit_checkpoint));
for (auto record : get_changes_resp.cdc_sdk_proto_records()) {
UpdateRecordCount(record, count);
}

// Third GetChanges call with from_op_id equal to second GetChanges response checkpoint, and
// explicit checkpoint equal to first GetChanges response checkpoint. This
// simulates the situation where connector requests further records but Kafka has not acknowledged
// the second transaction.
get_changes_resp = ASSERT_RESULT(GetChangesFromCDCWithExplictCheckpoint(
stream_id, tablets, &get_changes_resp.cdc_sdk_checkpoint(), &explicit_checkpoint));
for (auto record : get_changes_resp.cdc_sdk_proto_records()) {
UpdateRecordCount(record, count);
}

auto get_tablets_resp = ASSERT_RESULT(GetTabletListToPollForCDC(stream_id, table_id));
ASSERT_EQ(get_tablets_resp.tablet_checkpoint_pairs().size(), 1);

auto cp_from_tablet_list = get_tablets_resp.tablet_checkpoint_pairs()[0].cdc_sdk_checkpoint();

// Fourth GetChanges call with checkpoint and safe_hybrid_time from the response of
// GetTabletListToPoll. This simulates connector restart. If the safe time is properly updated in
// state table then GetChanges should return the records corresponding to the second transaction
// in the response. Also, since we will be calling GetChanges with an OpId different from the last
// streamed OpId for the tablet, the cached schema will be invalidated and we will also receive a
// DDL record in the response.
get_changes_resp = ASSERT_RESULT(GetChangesFromCDC(
stream_id, tablets, &cp_from_tablet_list, 0, cp_from_tablet_list.snapshot_time()));
for (auto record : get_changes_resp.cdc_sdk_proto_records()) {
UpdateRecordCount(record, count);
}

for (int i = 0; i < 8; i++) {
ASSERT_EQ(expected_count[i], count[i]);
}
}

// This test validates that the cdc_sdk_safe_time is moved forward only when the client specifies an
// explicit checkpoint with snapshot_time
TEST_F(CDCSDKYsqlTest, TestNoUpdateSafeTimeWithoutSnapshotTime) {
ASSERT_OK(SetUpWithParams(1, 1, false));
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdc_state_checkpoint_update_interval_ms) = 0;
auto table = ASSERT_RESULT(CreateTable(&test_cluster_, kNamespaceName, kTableName));
TableId table_id = ASSERT_RESULT(GetTableId(&test_cluster_, kNamespaceName, kTableName));

google::protobuf::RepeatedPtrField<master::TabletLocationsPB> tablets;
ASSERT_OK(test_client()->GetTablets(table, 0, &tablets, nullptr));
ASSERT_EQ(tablets.size(), 1);

xrepl::StreamId stream_id = ASSERT_RESULT(CreateDBStreamWithReplicationSlot());
auto set_resp = ASSERT_RESULT(SetCDCCheckpoint(stream_id, tablets, OpId::Min()));
ASSERT_FALSE(set_resp.has_error());

ASSERT_OK(WriteRowsHelper(0, 5, &test_cluster_, true));

auto get_changes_resp = ASSERT_RESULT(GetChangesFromCDC(stream_id, tablets));

ASSERT_OK(WriteRowsHelper(5, 10, &test_cluster_, true));
auto explicit_checkpoint = get_changes_resp.cdc_sdk_checkpoint();
explicit_checkpoint.set_snapshot_time(get_changes_resp.safe_hybrid_time());
auto checkpointed_time = explicit_checkpoint.snapshot_time();

// This GetChanges call would update the cdc_sdk_safe_time in the state table as its explicit
// checkpoint is initialized with snapshot time.
get_changes_resp = ASSERT_RESULT(GetChangesFromCDCWithExplictCheckpoint(
stream_id, tablets, &get_changes_resp.cdc_sdk_checkpoint(), &explicit_checkpoint));
auto row = ASSERT_RESULT(ReadFromCdcStateTable(stream_id, tablets[0].tablet_id()));
ASSERT_NE(row.cdc_sdk_safe_time, HybridTime::kInvalid);
ASSERT_EQ(row.cdc_sdk_safe_time, HybridTime::FromPB(checkpointed_time));

ASSERT_OK(WriteRowsHelper(10, 15, &test_cluster_, true));

// This GetChanges call will not update the cdc_sdk_safe_time in the state table as its explicit
// checkpoint is not initialized with snapshot time.
get_changes_resp = ASSERT_RESULT(GetChangesFromCDCWithExplictCheckpoint(
stream_id, tablets, &get_changes_resp.cdc_sdk_checkpoint(),
&get_changes_resp.cdc_sdk_checkpoint()));
row = ASSERT_RESULT(ReadFromCdcStateTable(stream_id, tablets[0].tablet_id()));
ASSERT_NE(row.cdc_sdk_safe_time, HybridTime::kInvalid);
ASSERT_EQ(row.cdc_sdk_safe_time, HybridTime::FromPB(checkpointed_time));
}

} // namespace cdc
} // namespace yb

0 comments on commit 1d4ca2b

Please sign in to comment.