Skip to content

Commit

Permalink
[#23278] CDCSDK: Handle non-eligible tables cleanup with drop table w…
Browse files Browse the repository at this point in the history
…hile loading CDC stream

Summary:
When a table present under a CDC stream is dropped, it is removed from the CDC stream metadata by a background thread.
Suppose before the background thread could cleanup, there was a master restart or a master leadership change. On either of these scenarios, while loading the CDC streams, we check all tables present in the CDC stream metadata for ineligibility. Table schema is one of the objects that is scanned while checking for ineligibility. To get the table schema, we fetch the `TableInfo` object from master. This step was leading to a master crash as we receive a nullptr while fetching TableInfo since the table has been dropped.
Jira: DB-12205

Test Plan: ./yb_build.sh --cxx-test cdcsdk_ysql-test --gtest_filter CDCSDKYsqlTest.TestNonEligibleTablesCleanupWhenDropTableCleanupIsDisabled

Reviewers: hsunder, asrinivasan, stiwary, skumar

Reviewed By: skumar

Subscribers: ybase

Tags: #jenkins-ready

Differential Revision: https://phorge.dev.yugabyte.com/D37053
  • Loading branch information
siddharth2411 committed Aug 5, 2024
1 parent 50931bf commit 64e1bf8
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 21 deletions.
56 changes: 56 additions & 0 deletions src/yb/integration-tests/cdcsdk_ysql-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -9287,5 +9287,61 @@ TEST_F(CDCSDKYsqlTest, TestChildTabletsOfNonEligibleTableDoNotGetAddedToConsiste
/* use_consistent_snapshot_stream */ true);
}

TEST_F(
CDCSDKYsqlTest,
YB_DISABLE_TEST_IN_TSAN(TestNonEligibleTablesCleanupWhenDropTableCleanupIsDisabled)) {
ANNOTATE_UNPROTECTED_WRITE(FLAGS_cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream) = true;
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_disable_drop_table_cleanup) = true;
// Setup cluster.
ASSERT_OK(SetUpWithParams(3, 3, false));
const vector<string> table_list_suffix = {"_1", "_2", "_3"};
const int kNumTables = 3;
vector<YBTableName> table(kNumTables);
int idx = 0;
vector<google::protobuf::RepeatedPtrField<master::TabletLocationsPB>> tablets(kNumTables);

while (idx < 3) {
table[idx] = ASSERT_RESULT(CreateTable(
&test_cluster_, kNamespaceName, kTableName, 1, true, false, 0, true,
table_list_suffix[idx]));
ASSERT_OK(test_client()->GetTablets(
table[idx], 0, &tablets[idx], /* partition_list_version = */ nullptr));
ASSERT_OK(WriteEnumsRows(
0 /* start */, 100 /* end */, &test_cluster_, table_list_suffix[idx], kNamespaceName,
kTableName));
idx += 1;
}

auto stream_id = ASSERT_RESULT(CreateConsistentSnapshotStream());
std::unordered_set<std::string> expected_table_ids = {
table[0].table_id(), table[1].table_id(), table[2].table_id()};
VerifyTablesInStreamMetadata(stream_id, expected_table_ids, "Waiting for stream metadata.");

LOG(INFO) << "Dropping table: " << Format("$0$1", kTableName, table_list_suffix[0]);
DropTable(&test_cluster_, Format("$0$1", kTableName, table_list_suffix[0]).c_str());
// Stream metadata wouldnt be cleaned up since the codepath is disabled via
// 'TEST_cdcsdk_disable_drop_table_cleanup' flag. Therefore all 3 tables are expected to be
// present in stream metadata.
SleepFor(MonoDelta::FromSeconds(3));
VerifyTablesInStreamMetadata(
stream_id, expected_table_ids, "Waiting for stream metadata after drop table.");

// On loading of CDC stream after a master leader restart, presence of non-eligible tables in CDC
// stream will be checked.
auto leader_master = ASSERT_RESULT(test_cluster_.mini_cluster_->GetLeaderMiniMaster());
ASSERT_OK(leader_master->Restart());
LOG(INFO) << "Master Restarted";
SleepFor(MonoDelta::FromSeconds(5));

// Enable bg threads to cleanup CDC stream metadata for dropped tables.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_TEST_cdcsdk_disable_drop_table_cleanup) = false;

// Verify the dropped table has been removed from stream metadata after enabling the cleanup.
expected_table_ids.erase(table[0].table_id());
VerifyTablesInStreamMetadata(
stream_id, expected_table_ids,
"Waiting for GetDBStreamInfo post metadata cleanup after restart.");
}

} // namespace cdc
} // namespace yb
2 changes: 2 additions & 0 deletions src/yb/integration-tests/cdcsdk_ysql_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ DECLARE_bool(TEST_cdcsdk_skip_updating_cdc_state_entries_on_table_removal);
DECLARE_bool(TEST_cdcsdk_add_indexes_to_stream);
DECLARE_bool(cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream);
DECLARE_bool(TEST_cdcsdk_skip_stream_active_check);
DECLARE_bool(TEST_cdcsdk_disable_drop_table_cleanup);

namespace yb {

using client::YBClient;
Expand Down
51 changes: 30 additions & 21 deletions src/yb/master/xrepl_catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ DEFINE_RUNTIME_bool(cdcsdk_enable_cleanup_of_non_eligible_tables_from_stream, fa
"materialised view etc. in their stream metadata and these tables will be marked for removal "
"by catalog manager background thread.");

DEFINE_test_flag(bool, cdcsdk_disable_drop_table_cleanup, false,
"When enabled, cleanup of dropped tables from CDC streams will be skipped.");

DEFINE_RUNTIME_AUTO_bool(cdcsdk_enable_identification_of_non_eligible_tables,
kLocalPersisted,
false,
Expand Down Expand Up @@ -1809,27 +1812,32 @@ void CatalogManager::FindAllNonEligibleTablesInCDCSDKStream(
for (const auto& table_id : table_ids) {
if (!user_table_ids.contains(table_id)) {
auto table_info = GetTableInfoUnlocked(table_id);
Schema schema;
Status status = table_info->GetSchema(&schema);
if (!status.ok()) {
LOG_WITH_FUNC(WARNING) << "Error while getting schema for table: " << table_info->name();
// Skip this table for now, it will be revisited for removal on master restart/master leader
// change.
continue;
}
if (table_info) {
Schema schema;
Status status = table_info->GetSchema(&schema);
if (!status.ok()) {
LOG_WITH_FUNC(WARNING) << "Error while getting schema for table: " << table_info->name();
// Skip this table for now, it will be revisited for removal on master restart/master
// leader change.
continue;
}

// Re-confirm this table is not meant to be part of a CDC stream.
if (!IsTableEligibleForCDCSDKStream(table_info, schema)) {
LOG(INFO) << "Found a non-eligible table: " << table_info->id()
<< ", for stream: " << stream_id;
LockGuard lock(cdcsdk_non_eligible_table_mutex_);
namespace_to_cdcsdk_non_eligible_table_map_[table_info->namespace_id()].insert(
table_info->id());
// Re-confirm this table is not meant to be part of a CDC stream.
if (!IsTableEligibleForCDCSDKStream(table_info, schema)) {
LOG(INFO) << "Found a non-eligible table: " << table_info->id()
<< ", for stream: " << stream_id;
LockGuard lock(cdcsdk_non_eligible_table_mutex_);
namespace_to_cdcsdk_non_eligible_table_map_[table_info->namespace_id()].insert(
table_info->id());
} else {
// Ideally we are not expected to enter the else clause.
LOG(WARNING) << "Found table " << table_id << " in metadata of stream " << stream_id
<< " that is not present in the eligible list of tables "
"from the namespace for CDC";
}
} else {
// Ideally we are not expected to enter the else clause.
LOG(WARNING) << "Found table " << table_id << " in metadata of stream " << stream_id
<< " that is not present in the eligible list of tables "
"from the namespace for CDC";
LOG(INFO) << "Found table " << table_id << " in stream " << stream_id
<< " metadata that is not present in master.";
}
}
}
Expand Down Expand Up @@ -4088,8 +4096,9 @@ void CatalogManager::RunXReplBgTasks(const LeaderEpoch& epoch) {
// Clean up Failed Replication Bootstrap on the Consumer.
WARN_NOT_OK(ClearFailedReplicationBootstrap(), "Failed Clearing Failed Replication Bootstrap");

WARN_NOT_OK(
CleanUpCDCSDKStreamsMetadata(epoch), "Failed Cleanup CDCSDK Streams Metadata");
if (!FLAGS_TEST_cdcsdk_disable_drop_table_cleanup) {
WARN_NOT_OK(CleanUpCDCSDKStreamsMetadata(epoch), "Failed Cleanup CDCSDK Streams Metadata");
}

// Restart xCluster and CDCSDK parent tablet deletion bg task.
StartXReplParentTabletDeletionTaskIfStopped();
Expand Down

0 comments on commit 64e1bf8

Please sign in to comment.