Skip to content

Commit

Permalink
[#23046] xCluster: Remove ns_replication from the code
Browse files Browse the repository at this point in the history
Summary:
Namespace replication is deprecated and not used. It has been replaced by DB Scoped replication.

**Upgrade/Rollback safety:**
NS Replication was never used

Fixes #23046
Jira: DB-11978

Test Plan: Jenkins

Reviewers: slingam, jhe, xCluster

Reviewed By: jhe

Subscribers: ybase

Differential Revision: https://phorge.dev.yugabyte.com/D36228
  • Loading branch information
hari90 committed Jun 28, 2024
1 parent 7c8343d commit 58c8d4e
Show file tree
Hide file tree
Showing 12 changed files with 3 additions and 836 deletions.
124 changes: 0 additions & 124 deletions src/yb/integration-tests/xcluster/xcluster-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2877,130 +2877,6 @@ TEST_P(XClusterTest, DeleteTableChecksCQL) {
}
}

TEST_P(XClusterTest, SetupNSUniverseReplicationExtraConsumerTables) {
// Initial setup: create 2 tables on each side.
constexpr int kNTabletsPerTable = 3;
std::vector<uint32_t> table_vector = {kNTabletsPerTable, kNTabletsPerTable};
ASSERT_OK(SetUpWithParams(table_vector, 1));

// Create 2 more consumer tables.
for (int i = 2; i < 4; i++) {
auto t = ASSERT_RESULT(CreateTable(
consumer_client(), namespace_name, Format("test_table_$0", i), kNTabletsPerTable));
consumer_tables_.push_back({});
ASSERT_OK(consumer_client()->OpenTable(t, &consumer_tables_.back()));
}

// Setup NS universe replication. Only the first 2 consumer tables will be replicated.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_ns_replication_sync_retry_secs) = 1;
ASSERT_OK(SetupNSUniverseReplication(
producer_cluster(), consumer_cluster(), consumer_client(), kReplicationGroupId,
namespace_name, YQLDatabase::YQL_DATABASE_CQL));
ASSERT_OK(VerifyNSUniverseReplication(
consumer_cluster(), consumer_client(), kReplicationGroupId,
narrow_cast<int>(producer_tables_.size())));

// Create the additional 2 tables on producer. Verify that they are added automatically.
for (int i = 2; i < 4; i++) {
auto t = ASSERT_RESULT(CreateTable(
producer_client(), namespace_name, Format("test_table_$0", i), kNTabletsPerTable));
producer_tables_.push_back({});
ASSERT_OK(producer_client()->OpenTable(t, &producer_tables_.back()));
}
ASSERT_OK(VerifyNSUniverseReplication(
consumer_cluster(), consumer_client(), kReplicationGroupId,
narrow_cast<int>(consumer_tables_.size())));

// Write some data and verify replication.
for (size_t i = 0; i < producer_tables_.size(); i++) {
ASSERT_OK(InsertRowsAndVerify(0, narrow_cast<int>(10 * (i + 1)), producer_tables_[i]));
}
ASSERT_OK(DeleteUniverseReplication());
}

TEST_P(XClusterTest, SetupNSUniverseReplicationExtraProducerTables) {
// Initial setup: create 2 tables on each side.
constexpr int kNTabletsPerTable = 3;
std::vector<uint32_t> table_vector = {kNTabletsPerTable, kNTabletsPerTable};
ASSERT_OK(SetUpWithParams(table_vector, 1));

// Create 2 more producer tables.
for (int i = 2; i < 4; i++) {
auto t = ASSERT_RESULT(CreateTable(
producer_client(), namespace_name, Format("test_table_$0", i), kNTabletsPerTable));
producer_tables_.push_back({});
ASSERT_OK(producer_client()->OpenTable(t, &producer_tables_.back()));
}

// Setup NS universe replication. Only the first 2 producer tables will be replicated.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_ns_replication_sync_backoff_secs) = 1;
ASSERT_OK(SetupNSUniverseReplication(
producer_cluster(), consumer_cluster(), consumer_client(), kReplicationGroupId,
namespace_name, YQLDatabase::YQL_DATABASE_CQL));
ASSERT_OK(VerifyNSUniverseReplication(
consumer_cluster(), consumer_client(), kReplicationGroupId,
narrow_cast<int>(consumer_tables_.size())));

// Create the additional 2 tables on consumer. Verify that they are added automatically.
for (int i = 2; i < 4; i++) {
auto t = ASSERT_RESULT(CreateTable(
consumer_client(), namespace_name, Format("test_table_$0", i), kNTabletsPerTable));
consumer_tables_.push_back({});
ASSERT_OK(consumer_client()->OpenTable(t, &consumer_tables_.back()));
}
ASSERT_OK(VerifyNSUniverseReplication(
consumer_cluster(), consumer_client(), kReplicationGroupId,
narrow_cast<int>(producer_tables_.size())));

// Write some data and verify replication.
for (size_t i = 0; i < producer_tables_.size(); i++) {
ASSERT_OK(InsertRowsAndVerify(0, narrow_cast<int>(10 * (i + 1)), producer_tables_[i]));
}
ASSERT_OK(DeleteUniverseReplication());
}

TEST_P(XClusterTest, SetupNSUniverseReplicationTwoNamespace) {
// Create 2 tables in one namespace.
constexpr int kNTabletsPerTable = 1;
std::vector<uint32_t> table_vector = {kNTabletsPerTable, kNTabletsPerTable};
ASSERT_OK(SetUpWithParams(table_vector, 1));

// Create 2 tables in another namespace.
string kNamespaceName2 = "test_namespace_2";
std::vector<std::shared_ptr<client::YBTable>> producer_tables;
std::vector<std::shared_ptr<client::YBTable>> consumer_tables;
producer_tables.reserve(2);
consumer_tables.reserve(2);
for (int i = 0; i < 2; i++) {
auto ptable = ASSERT_RESULT(CreateTable(
producer_client(), kNamespaceName2, Format("test_table_$0", i), kNTabletsPerTable));
producer_tables.push_back({});
ASSERT_OK(producer_client()->OpenTable(ptable, &producer_tables.back()));

auto ctable = ASSERT_RESULT(CreateTable(
consumer_client(), kNamespaceName2, Format("test_table_$0", i), kNTabletsPerTable));
consumer_tables.push_back({});
ASSERT_OK(consumer_client()->OpenTable(ctable, &consumer_tables.back()));
}

// Setup NS universe replication for the second namespace. Verify that the tables under the
// first namespace will not be added to the replication.
ANNOTATE_UNPROTECTED_WRITE(FLAGS_ns_replication_sync_backoff_secs) = 1;
ASSERT_OK(SetupNSUniverseReplication(
producer_cluster(), consumer_cluster(), consumer_client(), kReplicationGroupId,
kNamespaceName2, YQLDatabase::YQL_DATABASE_CQL));
SleepFor(MonoDelta::FromSeconds(5)); // Let the bg thread run a few times.
ASSERT_OK(VerifyNSUniverseReplication(
consumer_cluster(), consumer_client(), kReplicationGroupId,
narrow_cast<int>(producer_tables.size())));

// Write some data and verify replication.
for (size_t i = 0; i < producer_tables.size(); i++) {
ASSERT_OK(InsertRowsAndVerify(0, narrow_cast<int>(10 * (i + 1)), producer_tables[i]));
}
ASSERT_OK(DeleteUniverseReplication());
}

class XClusterTestWaitForReplicationDrain : public XClusterTest {
public:
void SetUpTablesAndReplication(
Expand Down
45 changes: 0 additions & 45 deletions src/yb/integration-tests/xcluster/xcluster_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -384,39 +384,6 @@ Status XClusterTestBase::SetupUniverseReplication(
return Status::OK();
}

Status XClusterTestBase::SetupNSUniverseReplication(
MiniCluster* producer_cluster, MiniCluster* consumer_cluster, YBClient* consumer_client,
const xcluster::ReplicationGroupId& replication_group_id, const std::string& producer_ns_name,
const YQLDatabase& producer_ns_type, SetupReplicationOptions opts) {
master::SetupNSUniverseReplicationRequestPB req;
master::SetupNSUniverseReplicationResponsePB resp;
req.set_replication_group_id(replication_group_id.ToString());
req.set_producer_ns_name(producer_ns_name);
req.set_producer_ns_type(producer_ns_type);

std::string master_addr = producer_cluster->GetMasterAddresses();
if (opts.leader_only) {
master_addr = VERIFY_RESULT(producer_cluster->GetLeaderMiniMaster())->bound_rpc_addr_str();
}
auto hp_vec = VERIFY_RESULT(HostPort::ParseStrings(master_addr, 0));
HostPortsToPBs(hp_vec, req.mutable_producer_master_addresses());

auto master_proxy = std::make_shared<master::MasterReplicationProxy>(
&consumer_client->proxy_cache(),
VERIFY_RESULT(consumer_cluster->GetLeaderMiniMaster())->bound_rpc_addr());

rpc::RpcController rpc;
rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout));
return WaitFor([&] () -> Result<bool> {
if (!master_proxy->SetupNSUniverseReplication(req, &resp, &rpc).ok()) {
return false;
} else if (resp.has_error()) {
return false;
}
return true;
}, MonoDelta::FromSeconds(30), "Setup namespace-level universe replication");
}

Status XClusterTestBase::VerifyUniverseReplication(master::GetUniverseReplicationResponsePB* resp) {
return VerifyUniverseReplication(kReplicationGroupId, resp);
}
Expand Down Expand Up @@ -458,18 +425,6 @@ Status XClusterTestBase::VerifyUniverseReplication(
MonoDelta::FromSeconds(kRpcTimeout), "Verify universe replication");
}

Status XClusterTestBase::VerifyNSUniverseReplication(
MiniCluster* consumer_cluster, YBClient* consumer_client,
const xcluster::ReplicationGroupId& replication_group_id, int num_expected_table) {
return LoggedWaitFor([&]() -> Result<bool> {
master::GetUniverseReplicationResponsePB resp;
auto s =
VerifyUniverseReplication(consumer_cluster, consumer_client, replication_group_id, &resp);
return s.ok() && resp.entry().replication_group_id() == replication_group_id &&
resp.entry().is_ns_replication() && resp.entry().tables_size() == num_expected_table;
}, MonoDelta::FromSeconds(kRpcTimeout), "Verify namespace-level universe replication");
}

Status XClusterTestBase::ToggleUniverseReplication(
MiniCluster* consumer_cluster, YBClient* consumer_client,
const xcluster::ReplicationGroupId& replication_group_id, bool is_enabled) {
Expand Down
10 changes: 0 additions & 10 deletions src/yb/integration-tests/xcluster/xcluster_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,6 @@ class XClusterTestBase : public YBTest {
const std::vector<xrepl::StreamId>& bootstrap_ids = {},
SetupReplicationOptions opts = SetupReplicationOptions());

Status SetupNSUniverseReplication(
MiniCluster* producer_cluster, MiniCluster* consumer_cluster, YBClient* consumer_client,
const xcluster::ReplicationGroupId& replication_group_id, const std::string& producer_ns_name,
const YQLDatabase& producer_ns_type,
SetupReplicationOptions opts = SetupReplicationOptions());

Status VerifyUniverseReplication(master::GetUniverseReplicationResponsePB* resp);

Status VerifyUniverseReplication(
Expand All @@ -217,10 +211,6 @@ class XClusterTestBase : public YBTest {
const xcluster::ReplicationGroupId& replication_group_id,
master::GetUniverseReplicationResponsePB* resp);

Status VerifyNSUniverseReplication(
MiniCluster* consumer_cluster, YBClient* consumer_client,
const xcluster::ReplicationGroupId& replication_group_id, int num_expected_table);

Status ToggleUniverseReplication(
MiniCluster* consumer_cluster, YBClient* consumer_client,
const xcluster::ReplicationGroupId& replication_group_id, bool is_enabled);
Expand Down
6 changes: 2 additions & 4 deletions src/yb/master/catalog_entity_info.proto
Original file line number Diff line number Diff line change
Expand Up @@ -684,10 +684,8 @@ message SysUniverseReplicationEntryPB {
// producer table ID -> producer CDC stream ID map.
map<string, string> table_streams = 6;

// Namespace-level replication setup.
optional bool is_ns_replication = 7 [default = false];
optional NamespaceIdentifierPB producer_namespace = 8;
optional NamespaceIdentifierPB consumer_namespace = 9;
// 7, 8, and 9 were used by Namespace-level replication but never populated in
// production so they are valid for reuse.

// Mapping from Producer Table ID to Producer->Consumer schema version mappings
map<string, SchemaVersionMappingEntryPB> schema_version_mappings = 10;
Expand Down
33 changes: 0 additions & 33 deletions src/yb/master/catalog_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -1476,12 +1476,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf,
WaitForReplicationDrainResponsePB* resp,
rpc::RpcContext* rpc);

// Setup Universe Replication for an entire producer namespace.
Status SetupNSUniverseReplication(
const SetupNSUniverseReplicationRequestPB* req,
SetupNSUniverseReplicationResponsePB* resp,
rpc::RpcContext* rpc);

// Returns the replication status.
Status GetReplicationStatus(
const GetReplicationStatusRequestPB* req,
Expand Down Expand Up @@ -1576,8 +1570,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf,

void ScheduleXReplParentTabletDeletionTask();

void ScheduleXClusterNSReplicationAddTableTask();

Result<scoped_refptr<TableInfo>> GetTableById(const TableId& table_id) const override;

void AddPendingBackFill(const TableId& id) override {
Expand Down Expand Up @@ -3029,21 +3021,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf,
const xcluster::ReplicationGroupId& replication_group_id, bool is_enabled,
ClusterConfigInfo::WriteLock* l);

void XClusterAddTableToNSReplication(
const xcluster::ReplicationGroupId& replication_group_id, CoarseTimePoint deadline);

// Find the list of producer table IDs that can be added to the current NS-level replication.
Status XClusterNSReplicationSyncWithProducer(
scoped_refptr<UniverseReplicationInfo> universe,
std::vector<TableId>* producer_tables_to_add,
bool* has_non_replicated_consumer_table);

// Compute the list of producer table IDs that have a name-matching consumer table.
Result<std::vector<TableId>> XClusterFindProducerConsumerOverlap(
std::shared_ptr<XClusterRpcTasks> producer_xcluster_rpc,
NamespaceIdentifierPB* producer_namespace, NamespaceIdentifierPB* consumer_namespace,
size_t* num_non_matched_consumer_tables);

// True when the cluster is a consumer of a NS-level replication stream.
std::atomic<bool> namespace_replication_enabled_{false};

Expand Down Expand Up @@ -3264,16 +3241,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf,
// True when the cluster is a producer of a valid replication stream.
std::atomic<bool> cdc_enabled_{false};

// Metadata on namespace-level replication setup. Map producer ID -> metadata.
struct NSReplicationInfo {
// Until after this time, no additional add table task will be scheduled.
// Actively modified by the background thread.
CoarseTimePoint next_add_table_task_time = CoarseTimePoint::max();
int num_accumulated_errors;
};
std::unordered_map<xcluster::ReplicationGroupId, NSReplicationInfo> namespace_replication_map_
GUARDED_BY(mutex_);

std::atomic<bool> pg_catalog_versions_bg_task_running_ = {false};
rpc::ScheduledTaskTracker refresh_ysql_pg_catalog_versions_task_;

Expand Down
13 changes: 0 additions & 13 deletions src/yb/master/master_replication.proto
Original file line number Diff line number Diff line change
Expand Up @@ -453,17 +453,6 @@ message WaitForReplicationDrainResponsePB {
repeated UndrainedStreamInfoPB undrained_stream_info = 2;
}

message SetupNSUniverseReplicationRequestPB {
optional string replication_group_id = 1;
repeated HostPortPB producer_master_addresses = 2;
optional string producer_ns_name = 3;
optional YQLDatabase producer_ns_type = 4;
}

message SetupNSUniverseReplicationResponsePB {
optional MasterErrorPB error = 1;
}

message UpdateConsumerOnProducerMetadataRequestPB {
optional string replication_group_id = 1;
optional string stream_id = 2;
Expand Down Expand Up @@ -831,8 +820,6 @@ service MasterReplication {
returns (IsSetupNamespaceReplicationWithBootstrapDoneResponsePB);
rpc IsSetupUniverseReplicationDone(IsSetupUniverseReplicationDoneRequestPB)
returns (IsSetupUniverseReplicationDoneResponsePB);
rpc SetupNSUniverseReplication(SetupNSUniverseReplicationRequestPB)
returns (SetupNSUniverseReplicationResponsePB);
rpc ChangeXClusterRole(ChangeXClusterRoleRequestPB)
returns (ChangeXClusterRoleResponsePB);

Expand Down
1 change: 0 additions & 1 deletion src/yb/master/master_replication_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ class MasterReplicationServiceImpl : public MasterServiceBase, public MasterRepl
(GetCDCDBStreamInfo)
(IsBootstrapRequired)
(WaitForReplicationDrain)
(SetupNSUniverseReplication)
(GetReplicationStatus)
(GetTableSchemaFromSysCatalog)
(ChangeXClusterRole)
Expand Down
Loading

0 comments on commit 58c8d4e

Please sign in to comment.