diff --git a/src/yb/integration-tests/xcluster/xcluster-test.cc b/src/yb/integration-tests/xcluster/xcluster-test.cc index ddfcd23a0931..e2edadd47200 100644 --- a/src/yb/integration-tests/xcluster/xcluster-test.cc +++ b/src/yb/integration-tests/xcluster/xcluster-test.cc @@ -2877,130 +2877,6 @@ TEST_P(XClusterTest, DeleteTableChecksCQL) { } } -TEST_P(XClusterTest, SetupNSUniverseReplicationExtraConsumerTables) { - // Initial setup: create 2 tables on each side. - constexpr int kNTabletsPerTable = 3; - std::vector table_vector = {kNTabletsPerTable, kNTabletsPerTable}; - ASSERT_OK(SetUpWithParams(table_vector, 1)); - - // Create 2 more consumer tables. - for (int i = 2; i < 4; i++) { - auto t = ASSERT_RESULT(CreateTable( - consumer_client(), namespace_name, Format("test_table_$0", i), kNTabletsPerTable)); - consumer_tables_.push_back({}); - ASSERT_OK(consumer_client()->OpenTable(t, &consumer_tables_.back())); - } - - // Setup NS universe replication. Only the first 2 consumer tables will be replicated. - ANNOTATE_UNPROTECTED_WRITE(FLAGS_ns_replication_sync_retry_secs) = 1; - ASSERT_OK(SetupNSUniverseReplication( - producer_cluster(), consumer_cluster(), consumer_client(), kReplicationGroupId, - namespace_name, YQLDatabase::YQL_DATABASE_CQL)); - ASSERT_OK(VerifyNSUniverseReplication( - consumer_cluster(), consumer_client(), kReplicationGroupId, - narrow_cast(producer_tables_.size()))); - - // Create the additional 2 tables on producer. Verify that they are added automatically. - for (int i = 2; i < 4; i++) { - auto t = ASSERT_RESULT(CreateTable( - producer_client(), namespace_name, Format("test_table_$0", i), kNTabletsPerTable)); - producer_tables_.push_back({}); - ASSERT_OK(producer_client()->OpenTable(t, &producer_tables_.back())); - } - ASSERT_OK(VerifyNSUniverseReplication( - consumer_cluster(), consumer_client(), kReplicationGroupId, - narrow_cast(consumer_tables_.size()))); - - // Write some data and verify replication. - for (size_t i = 0; i < producer_tables_.size(); i++) { - ASSERT_OK(InsertRowsAndVerify(0, narrow_cast(10 * (i + 1)), producer_tables_[i])); - } - ASSERT_OK(DeleteUniverseReplication()); -} - -TEST_P(XClusterTest, SetupNSUniverseReplicationExtraProducerTables) { - // Initial setup: create 2 tables on each side. - constexpr int kNTabletsPerTable = 3; - std::vector table_vector = {kNTabletsPerTable, kNTabletsPerTable}; - ASSERT_OK(SetUpWithParams(table_vector, 1)); - - // Create 2 more producer tables. - for (int i = 2; i < 4; i++) { - auto t = ASSERT_RESULT(CreateTable( - producer_client(), namespace_name, Format("test_table_$0", i), kNTabletsPerTable)); - producer_tables_.push_back({}); - ASSERT_OK(producer_client()->OpenTable(t, &producer_tables_.back())); - } - - // Setup NS universe replication. Only the first 2 producer tables will be replicated. - ANNOTATE_UNPROTECTED_WRITE(FLAGS_ns_replication_sync_backoff_secs) = 1; - ASSERT_OK(SetupNSUniverseReplication( - producer_cluster(), consumer_cluster(), consumer_client(), kReplicationGroupId, - namespace_name, YQLDatabase::YQL_DATABASE_CQL)); - ASSERT_OK(VerifyNSUniverseReplication( - consumer_cluster(), consumer_client(), kReplicationGroupId, - narrow_cast(consumer_tables_.size()))); - - // Create the additional 2 tables on consumer. Verify that they are added automatically. - for (int i = 2; i < 4; i++) { - auto t = ASSERT_RESULT(CreateTable( - consumer_client(), namespace_name, Format("test_table_$0", i), kNTabletsPerTable)); - consumer_tables_.push_back({}); - ASSERT_OK(consumer_client()->OpenTable(t, &consumer_tables_.back())); - } - ASSERT_OK(VerifyNSUniverseReplication( - consumer_cluster(), consumer_client(), kReplicationGroupId, - narrow_cast(producer_tables_.size()))); - - // Write some data and verify replication. - for (size_t i = 0; i < producer_tables_.size(); i++) { - ASSERT_OK(InsertRowsAndVerify(0, narrow_cast(10 * (i + 1)), producer_tables_[i])); - } - ASSERT_OK(DeleteUniverseReplication()); -} - -TEST_P(XClusterTest, SetupNSUniverseReplicationTwoNamespace) { - // Create 2 tables in one namespace. - constexpr int kNTabletsPerTable = 1; - std::vector table_vector = {kNTabletsPerTable, kNTabletsPerTable}; - ASSERT_OK(SetUpWithParams(table_vector, 1)); - - // Create 2 tables in another namespace. - string kNamespaceName2 = "test_namespace_2"; - std::vector> producer_tables; - std::vector> consumer_tables; - producer_tables.reserve(2); - consumer_tables.reserve(2); - for (int i = 0; i < 2; i++) { - auto ptable = ASSERT_RESULT(CreateTable( - producer_client(), kNamespaceName2, Format("test_table_$0", i), kNTabletsPerTable)); - producer_tables.push_back({}); - ASSERT_OK(producer_client()->OpenTable(ptable, &producer_tables.back())); - - auto ctable = ASSERT_RESULT(CreateTable( - consumer_client(), kNamespaceName2, Format("test_table_$0", i), kNTabletsPerTable)); - consumer_tables.push_back({}); - ASSERT_OK(consumer_client()->OpenTable(ctable, &consumer_tables.back())); - } - - // Setup NS universe replication for the second namespace. Verify that the tables under the - // first namespace will not be added to the replication. - ANNOTATE_UNPROTECTED_WRITE(FLAGS_ns_replication_sync_backoff_secs) = 1; - ASSERT_OK(SetupNSUniverseReplication( - producer_cluster(), consumer_cluster(), consumer_client(), kReplicationGroupId, - kNamespaceName2, YQLDatabase::YQL_DATABASE_CQL)); - SleepFor(MonoDelta::FromSeconds(5)); // Let the bg thread run a few times. - ASSERT_OK(VerifyNSUniverseReplication( - consumer_cluster(), consumer_client(), kReplicationGroupId, - narrow_cast(producer_tables.size()))); - - // Write some data and verify replication. - for (size_t i = 0; i < producer_tables.size(); i++) { - ASSERT_OK(InsertRowsAndVerify(0, narrow_cast(10 * (i + 1)), producer_tables[i])); - } - ASSERT_OK(DeleteUniverseReplication()); -} - class XClusterTestWaitForReplicationDrain : public XClusterTest { public: void SetUpTablesAndReplication( diff --git a/src/yb/integration-tests/xcluster/xcluster_test_base.cc b/src/yb/integration-tests/xcluster/xcluster_test_base.cc index 851d7fbcaa41..aa0c53fe94ea 100644 --- a/src/yb/integration-tests/xcluster/xcluster_test_base.cc +++ b/src/yb/integration-tests/xcluster/xcluster_test_base.cc @@ -384,39 +384,6 @@ Status XClusterTestBase::SetupUniverseReplication( return Status::OK(); } -Status XClusterTestBase::SetupNSUniverseReplication( - MiniCluster* producer_cluster, MiniCluster* consumer_cluster, YBClient* consumer_client, - const xcluster::ReplicationGroupId& replication_group_id, const std::string& producer_ns_name, - const YQLDatabase& producer_ns_type, SetupReplicationOptions opts) { - master::SetupNSUniverseReplicationRequestPB req; - master::SetupNSUniverseReplicationResponsePB resp; - req.set_replication_group_id(replication_group_id.ToString()); - req.set_producer_ns_name(producer_ns_name); - req.set_producer_ns_type(producer_ns_type); - - std::string master_addr = producer_cluster->GetMasterAddresses(); - if (opts.leader_only) { - master_addr = VERIFY_RESULT(producer_cluster->GetLeaderMiniMaster())->bound_rpc_addr_str(); - } - auto hp_vec = VERIFY_RESULT(HostPort::ParseStrings(master_addr, 0)); - HostPortsToPBs(hp_vec, req.mutable_producer_master_addresses()); - - auto master_proxy = std::make_shared( - &consumer_client->proxy_cache(), - VERIFY_RESULT(consumer_cluster->GetLeaderMiniMaster())->bound_rpc_addr()); - - rpc::RpcController rpc; - rpc.set_timeout(MonoDelta::FromSeconds(kRpcTimeout)); - return WaitFor([&] () -> Result { - if (!master_proxy->SetupNSUniverseReplication(req, &resp, &rpc).ok()) { - return false; - } else if (resp.has_error()) { - return false; - } - return true; - }, MonoDelta::FromSeconds(30), "Setup namespace-level universe replication"); -} - Status XClusterTestBase::VerifyUniverseReplication(master::GetUniverseReplicationResponsePB* resp) { return VerifyUniverseReplication(kReplicationGroupId, resp); } @@ -458,18 +425,6 @@ Status XClusterTestBase::VerifyUniverseReplication( MonoDelta::FromSeconds(kRpcTimeout), "Verify universe replication"); } -Status XClusterTestBase::VerifyNSUniverseReplication( - MiniCluster* consumer_cluster, YBClient* consumer_client, - const xcluster::ReplicationGroupId& replication_group_id, int num_expected_table) { - return LoggedWaitFor([&]() -> Result { - master::GetUniverseReplicationResponsePB resp; - auto s = - VerifyUniverseReplication(consumer_cluster, consumer_client, replication_group_id, &resp); - return s.ok() && resp.entry().replication_group_id() == replication_group_id && - resp.entry().is_ns_replication() && resp.entry().tables_size() == num_expected_table; - }, MonoDelta::FromSeconds(kRpcTimeout), "Verify namespace-level universe replication"); -} - Status XClusterTestBase::ToggleUniverseReplication( MiniCluster* consumer_cluster, YBClient* consumer_client, const xcluster::ReplicationGroupId& replication_group_id, bool is_enabled) { diff --git a/src/yb/integration-tests/xcluster/xcluster_test_base.h b/src/yb/integration-tests/xcluster/xcluster_test_base.h index 37b7775d60fa..279715f4e64e 100644 --- a/src/yb/integration-tests/xcluster/xcluster_test_base.h +++ b/src/yb/integration-tests/xcluster/xcluster_test_base.h @@ -200,12 +200,6 @@ class XClusterTestBase : public YBTest { const std::vector& bootstrap_ids = {}, SetupReplicationOptions opts = SetupReplicationOptions()); - Status SetupNSUniverseReplication( - MiniCluster* producer_cluster, MiniCluster* consumer_cluster, YBClient* consumer_client, - const xcluster::ReplicationGroupId& replication_group_id, const std::string& producer_ns_name, - const YQLDatabase& producer_ns_type, - SetupReplicationOptions opts = SetupReplicationOptions()); - Status VerifyUniverseReplication(master::GetUniverseReplicationResponsePB* resp); Status VerifyUniverseReplication( @@ -217,10 +211,6 @@ class XClusterTestBase : public YBTest { const xcluster::ReplicationGroupId& replication_group_id, master::GetUniverseReplicationResponsePB* resp); - Status VerifyNSUniverseReplication( - MiniCluster* consumer_cluster, YBClient* consumer_client, - const xcluster::ReplicationGroupId& replication_group_id, int num_expected_table); - Status ToggleUniverseReplication( MiniCluster* consumer_cluster, YBClient* consumer_client, const xcluster::ReplicationGroupId& replication_group_id, bool is_enabled); diff --git a/src/yb/master/catalog_entity_info.proto b/src/yb/master/catalog_entity_info.proto index 9d34e1334179..248d8555fda2 100644 --- a/src/yb/master/catalog_entity_info.proto +++ b/src/yb/master/catalog_entity_info.proto @@ -684,10 +684,8 @@ message SysUniverseReplicationEntryPB { // producer table ID -> producer CDC stream ID map. map table_streams = 6; - // Namespace-level replication setup. - optional bool is_ns_replication = 7 [default = false]; - optional NamespaceIdentifierPB producer_namespace = 8; - optional NamespaceIdentifierPB consumer_namespace = 9; + // 7, 8, and 9 were used by Namespace-level replication but never populated in + // production so they are valid for reuse. // Mapping from Producer Table ID to Producer->Consumer schema version mappings map schema_version_mappings = 10; diff --git a/src/yb/master/catalog_manager.h b/src/yb/master/catalog_manager.h index ca29d8f2dd9d..138638e527c2 100644 --- a/src/yb/master/catalog_manager.h +++ b/src/yb/master/catalog_manager.h @@ -1476,12 +1476,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf, WaitForReplicationDrainResponsePB* resp, rpc::RpcContext* rpc); - // Setup Universe Replication for an entire producer namespace. - Status SetupNSUniverseReplication( - const SetupNSUniverseReplicationRequestPB* req, - SetupNSUniverseReplicationResponsePB* resp, - rpc::RpcContext* rpc); - // Returns the replication status. Status GetReplicationStatus( const GetReplicationStatusRequestPB* req, @@ -1576,8 +1570,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf, void ScheduleXReplParentTabletDeletionTask(); - void ScheduleXClusterNSReplicationAddTableTask(); - Result> GetTableById(const TableId& table_id) const override; void AddPendingBackFill(const TableId& id) override { @@ -3029,21 +3021,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf, const xcluster::ReplicationGroupId& replication_group_id, bool is_enabled, ClusterConfigInfo::WriteLock* l); - void XClusterAddTableToNSReplication( - const xcluster::ReplicationGroupId& replication_group_id, CoarseTimePoint deadline); - - // Find the list of producer table IDs that can be added to the current NS-level replication. - Status XClusterNSReplicationSyncWithProducer( - scoped_refptr universe, - std::vector* producer_tables_to_add, - bool* has_non_replicated_consumer_table); - - // Compute the list of producer table IDs that have a name-matching consumer table. - Result> XClusterFindProducerConsumerOverlap( - std::shared_ptr producer_xcluster_rpc, - NamespaceIdentifierPB* producer_namespace, NamespaceIdentifierPB* consumer_namespace, - size_t* num_non_matched_consumer_tables); - // True when the cluster is a consumer of a NS-level replication stream. std::atomic namespace_replication_enabled_{false}; @@ -3264,16 +3241,6 @@ class CatalogManager : public tserver::TabletPeerLookupIf, // True when the cluster is a producer of a valid replication stream. std::atomic cdc_enabled_{false}; - // Metadata on namespace-level replication setup. Map producer ID -> metadata. - struct NSReplicationInfo { - // Until after this time, no additional add table task will be scheduled. - // Actively modified by the background thread. - CoarseTimePoint next_add_table_task_time = CoarseTimePoint::max(); - int num_accumulated_errors; - }; - std::unordered_map namespace_replication_map_ - GUARDED_BY(mutex_); - std::atomic pg_catalog_versions_bg_task_running_ = {false}; rpc::ScheduledTaskTracker refresh_ysql_pg_catalog_versions_task_; diff --git a/src/yb/master/master_replication.proto b/src/yb/master/master_replication.proto index 47241c5897d8..ac4a558ec1ac 100644 --- a/src/yb/master/master_replication.proto +++ b/src/yb/master/master_replication.proto @@ -453,17 +453,6 @@ message WaitForReplicationDrainResponsePB { repeated UndrainedStreamInfoPB undrained_stream_info = 2; } -message SetupNSUniverseReplicationRequestPB { - optional string replication_group_id = 1; - repeated HostPortPB producer_master_addresses = 2; - optional string producer_ns_name = 3; - optional YQLDatabase producer_ns_type = 4; -} - -message SetupNSUniverseReplicationResponsePB { - optional MasterErrorPB error = 1; -} - message UpdateConsumerOnProducerMetadataRequestPB { optional string replication_group_id = 1; optional string stream_id = 2; @@ -831,8 +820,6 @@ service MasterReplication { returns (IsSetupNamespaceReplicationWithBootstrapDoneResponsePB); rpc IsSetupUniverseReplicationDone(IsSetupUniverseReplicationDoneRequestPB) returns (IsSetupUniverseReplicationDoneResponsePB); - rpc SetupNSUniverseReplication(SetupNSUniverseReplicationRequestPB) - returns (SetupNSUniverseReplicationResponsePB); rpc ChangeXClusterRole(ChangeXClusterRoleRequestPB) returns (ChangeXClusterRoleResponsePB); diff --git a/src/yb/master/master_replication_service.cc b/src/yb/master/master_replication_service.cc index b9ce83d56a61..4e3fa3b2671e 100644 --- a/src/yb/master/master_replication_service.cc +++ b/src/yb/master/master_replication_service.cc @@ -51,7 +51,6 @@ class MasterReplicationServiceImpl : public MasterServiceBase, public MasterRepl (GetCDCDBStreamInfo) (IsBootstrapRequired) (WaitForReplicationDrain) - (SetupNSUniverseReplication) (GetReplicationStatus) (GetTableSchemaFromSysCatalog) (ChangeXClusterRole) diff --git a/src/yb/master/xrepl_catalog_manager.cc b/src/yb/master/xrepl_catalog_manager.cc index ab1a122272df..e97d2e9388df 100644 --- a/src/yb/master/xrepl_catalog_manager.cc +++ b/src/yb/master/xrepl_catalog_manager.cc @@ -480,19 +480,6 @@ class UniverseReplicationLoader : public Visitoruniverses_to_clear_.push_back(ri->ReplicationGroupId()); } - // Check if this is a namespace-level replication. - if (l->pb.has_is_ns_replication() && l->pb.is_ns_replication()) { - DCHECK(!ContainsKey(catalog_manager_->namespace_replication_map_, replication_group_id)) - << "Duplicated namespace-level replication producer universe:" << replication_group_id; - catalog_manager_->namespace_replication_enabled_.store(true, std::memory_order_release); - - // Force the consumer to sync with producer immediately. - auto& metadata = - catalog_manager_ - ->namespace_replication_map_[replication_group_id]; - metadata.next_add_table_task_time = CoarseMonoClock::Now(); - } - l.Commit(); } @@ -5009,8 +4996,7 @@ Status CatalogManager::DeleteUniverseReplicationUnlocked( LOG(WARNING) << "Failed to remove replication info from map: replication_group_id: " << universe->id(); } - // If replication is at namespace-level, also remove from the namespace-level map. - namespace_replication_map_.erase(universe->ReplicationGroupId()); + // Also update the mapping of consumer tables. for (const auto& table : universe->metadata().state().pb.validated_tables()) { if (xcluster_consumer_table_stream_ids_map_[table.second].erase( @@ -5993,142 +5979,6 @@ Status CatalogManager::WaitForReplicationDrain( return Status::OK(); } -Status CatalogManager::SetupNSUniverseReplication( - const SetupNSUniverseReplicationRequestPB* req, - SetupNSUniverseReplicationResponsePB* resp, - rpc::RpcContext* rpc) { - LOG(INFO) << "SetupNSUniverseReplication from " << RequestorString(rpc) << ": " - << req->DebugString(); - - SCHECK( - req->has_replication_group_id() && !req->replication_group_id().empty(), InvalidArgument, - "Producer universe ID must be provided"); - SCHECK( - req->has_producer_ns_name() && !req->producer_ns_name().empty(), InvalidArgument, - "Producer universe namespace name must be provided"); - SCHECK( - req->has_producer_ns_type(), InvalidArgument, - "Producer universe namespace type must be provided"); - SCHECK( - req->producer_master_addresses_size() > 0, InvalidArgument, - "Producer master address must be provided"); - - std::string ns_name = req->producer_ns_name(); - YQLDatabase ns_type = req->producer_ns_type(); - switch (ns_type) { - case YQLDatabase::YQL_DATABASE_CQL: - break; - case YQLDatabase::YQL_DATABASE_PGSQL: - return STATUS( - InvalidArgument, "YSQL not currently supported for namespace-level replication setup"); - default: - return STATUS(InvalidArgument, Format("Unrecognized namespace type: $0", ns_type)); - } - - // 1. Find all producer tables with a name-matching consumer table. Ensure that no - // bootstrapping is required for these producer tables. - std::vector producer_tables; - NamespaceIdentifierPB producer_namespace; - NamespaceIdentifierPB consumer_namespace; - // namespace_id will be filled in XClusterFindProducerConsumerOverlap. - producer_namespace.set_name(ns_name); - producer_namespace.set_database_type(ns_type); - consumer_namespace.set_name(ns_name); - consumer_namespace.set_database_type(ns_type); - size_t num_non_matched_consumer_tables = 0; - { - std::vector hp; - HostPortsFromPBs(req->producer_master_addresses(), &hp); - std::string producer_addrs = HostPort::ToCommaSeparatedString(hp); - auto xcluster_rpc = VERIFY_RESULT(XClusterRpcTasks::CreateWithMasterAddrs( - xcluster::ReplicationGroupId(req->replication_group_id()), producer_addrs)); - producer_tables = VERIFY_RESULT(XClusterFindProducerConsumerOverlap( - xcluster_rpc, &producer_namespace, &consumer_namespace, &num_non_matched_consumer_tables)); - - // TODO: Remove this check after NS-level bootstrap is implemented. - auto bootstrap_required = - VERIFY_RESULT(xcluster_rpc->client()->IsBootstrapRequired(producer_tables)); - SCHECK( - !bootstrap_required, IllegalState, - Format("Producer tables under namespace $0 require bootstrapping.", ns_name)); - } - SCHECK( - !producer_tables.empty(), NotFound, - Format( - "No producer tables under namespace $0 can be set up for replication. Please make " - "sure that there are at least one pair of (producer, consumer) table with matching " - "name and schema in order to initialize the namespace-level replication.", - ns_name)); - - // 2. Setup universe replication for these producer tables. - { - SetupUniverseReplicationRequestPB setup_req; - SetupUniverseReplicationResponsePB setup_resp; - setup_req.set_replication_group_id(req->replication_group_id()); - setup_req.mutable_producer_master_addresses()->CopyFrom(req->producer_master_addresses()); - for (const auto& tid : producer_tables) { - setup_req.add_producer_table_ids(tid); - } - auto s = SetupUniverseReplication(&setup_req, &setup_resp, rpc); - if (!s.ok()) { - if (setup_resp.has_error()) { - resp->mutable_error()->Swap(setup_resp.mutable_error()); - return s; - } - return SetupError(resp->mutable_error(), s); - } - } - - // 3. Wait for the universe replication setup to finish. - // TODO: Put all the following code in an async task to avoid this expensive wait. - CoarseTimePoint deadline = rpc->GetClientDeadline(); - auto s = xcluster_manager_->WaitForSetupUniverseReplicationToFinish( - xcluster::ReplicationGroupId(req->replication_group_id()), deadline); - if (!s.ok()) { - return SetupError(resp->mutable_error(), s); - } - - // 4. Update the persisted data. - scoped_refptr universe; - { - SharedLock lock(mutex_); - TRACE("Acquired catalog manager lock"); - universe = FindPtrOrNull( - universe_replication_map_, xcluster::ReplicationGroupId(req->replication_group_id())); - if (universe == nullptr) { - return STATUS( - NotFound, "Could not find universe after SetupUniverseReplication", - req->ShortDebugString(), MasterError(MasterErrorPB::UNKNOWN_ERROR)); - } - } - auto l = universe->LockForWrite(); - l.mutable_data()->pb.set_is_ns_replication(true); - l.mutable_data()->pb.mutable_producer_namespace()->CopyFrom(producer_namespace); - l.mutable_data()->pb.mutable_consumer_namespace()->CopyFrom(consumer_namespace); - l.Commit(); - - // 5. Initialize in-memory entry and start the periodic task. - { - LockGuard lock(mutex_); - auto& metadata = - namespace_replication_map_[xcluster::ReplicationGroupId(req->replication_group_id())]; - if (num_non_matched_consumer_tables > 0) { - // Start the periodic sync immediately. - metadata.next_add_table_task_time = - CoarseMonoClock::Now() + - MonoDelta::FromSeconds(GetAtomicFlag(&FLAGS_ns_replication_sync_retry_secs)); - } else { - // Delay the sync since there are currently no non-replicated consumer tables. - metadata.next_add_table_task_time = - CoarseMonoClock::Now() + - MonoDelta::FromSeconds(GetAtomicFlag(&FLAGS_ns_replication_sync_backoff_secs)); - } - } - namespace_replication_enabled_.store(true, std::memory_order_release); - - return Status::OK(); -} - // Sync xcluster_consumer_replication_error_map_ with the streams we have in our producer_map. void CatalogManager::SyncXClusterConsumerReplicationStatusMap( const xcluster::ReplicationGroupId& replication_group_id, @@ -6912,9 +6762,6 @@ void CatalogManager::RunXReplBgTasks(const LeaderEpoch& epoch) { // Restart xCluster and CDCSDK parent tablet deletion bg task. StartXReplParentTabletDeletionTaskIfStopped(); - // Run periodic task for namespace-level replications. - ScheduleXClusterNSReplicationAddTableTask(); - WARN_NOT_OK( XClusterProcessPendingSchemaChanges(epoch), "Failed processing xCluster Pending Schema Changes"); @@ -7345,256 +7192,6 @@ std::shared_ptr CatalogManager::GetCDCServiceProxy(RemoteT void CatalogManager::SetCDCServiceEnabled() { cdc_enabled_.store(true, std::memory_order_release); } -void CatalogManager::ScheduleXClusterNSReplicationAddTableTask() { - if (!namespace_replication_enabled_.load(std::memory_order_acquire)) { - return; - } - - LockGuard lock(mutex_); - for (auto& map_entry : namespace_replication_map_) { - auto& metadata = map_entry.second; - if (CoarseMonoClock::Now() <= metadata.next_add_table_task_time) { - continue; - } - // Enqueue the async add table task, which involves syncing with producer and adding - // tables to the existing replication. - const auto& replication_group_id = map_entry.first; - CoarseTimePoint deadline = CoarseMonoClock::Now() + MonoDelta::FromSeconds(60); - auto s = background_tasks_thread_pool_->SubmitFunc(std::bind( - &CatalogManager::XClusterAddTableToNSReplication, this, replication_group_id, deadline)); - if (!s.ok()) { - // By not setting next_add_table_task_time, this enforces the task to be resheduled the - // next time the background thread runs. - LOG(WARNING) << "Failed to schedule: XClusterAddTableToNSReplication"; - } else { - // Prevent new tasks from being scheduled when the current task is running. - metadata.next_add_table_task_time = deadline; - } - } -} - -void CatalogManager::XClusterAddTableToNSReplication( - const xcluster::ReplicationGroupId& replication_group_id, CoarseTimePoint deadline) { - // TODO: In ScopeExit, find a way to report non-OK task_status to user. - bool has_non_replicated_consumer_table = true; - Status task_status = Status::OK(); - auto scope_exit = ScopeExit([&, this] { - LockGuard lock(mutex_); - auto ns_replication_info = FindOrNull(namespace_replication_map_, replication_group_id); - - // Only update metadata if we are the most recent task for this universe. - if (ns_replication_info && ns_replication_info->next_add_table_task_time == deadline) { - auto& metadata = *ns_replication_info; - // a. If there are error, emit to prometheus (TODO) and force another round of syncing. - // When there are too many consecutive errors, stop the task for a long period. - // b. Else if there is non-replicated consumer table, force another round of syncing. - // c. Else, stop the task temporarily. - if (!task_status.ok()) { - metadata.num_accumulated_errors++; - if (metadata.num_accumulated_errors == 5) { - metadata.num_accumulated_errors = 0; - metadata.next_add_table_task_time = - CoarseMonoClock::now() + - MonoDelta::FromSeconds(GetAtomicFlag(&FLAGS_ns_replication_sync_error_backoff_secs)); - } else { - metadata.next_add_table_task_time = - CoarseMonoClock::now() + - MonoDelta::FromSeconds(GetAtomicFlag(&FLAGS_ns_replication_sync_retry_secs)); - } - } else { - metadata.num_accumulated_errors = 0; - metadata.next_add_table_task_time = - CoarseMonoClock::now() + - MonoDelta::FromSeconds( - has_non_replicated_consumer_table - ? GetAtomicFlag(&FLAGS_ns_replication_sync_retry_secs) - : GetAtomicFlag(&FLAGS_ns_replication_sync_backoff_secs)); - } - } - }); - - if (deadline - CoarseMonoClock::Now() <= 1ms || !CheckIsLeaderAndReady().ok()) { - return; - } - - // 1. Sync with producer to find new producer tables that can be added to the current - // replication, and verify that these tables do not require bootstrapping. - scoped_refptr universe; - { - SharedLock lock(mutex_); - universe = FindPtrOrNull(universe_replication_map_, replication_group_id); - if (universe == nullptr) { - task_status = STATUS(NotFound, "Universe not found", replication_group_id); - LOG_WITH_FUNC(WARNING) << task_status; - return; - } - } - std::vector tables_to_add; - task_status = XClusterNSReplicationSyncWithProducer( - universe, &tables_to_add, &has_non_replicated_consumer_table); - if (!task_status.ok()) { - LOG_WITH_FUNC(WARNING) << "Error finding producer tables to add to universe " << universe->id() - << " : " << task_status; - return; - } - if (tables_to_add.empty()) { - return; - } - - // 2. Run AlterUniverseReplication to add the new tables to the current replication. - AlterUniverseReplicationRequestPB alter_req; - AlterUniverseReplicationResponsePB alter_resp; - alter_req.set_replication_group_id(replication_group_id.ToString()); - for (const auto& table : tables_to_add) { - alter_req.add_producer_table_ids_to_add(table); - } - - task_status = AlterUniverseReplication( - &alter_req, &alter_resp, /* RpcContext */ nullptr, GetLeaderEpochInternal()); - if (task_status.ok() && alter_resp.has_error()) { - task_status = StatusFromPB(alter_resp.error().status()); - } - if (!task_status.ok()) { - LOG_WITH_FUNC(WARNING) << "Unable to add producer tables to namespace-level replication: " - << task_status; - return; - } - - // 3. Wait for AlterUniverseReplication to finish. - task_status = xcluster_manager_->WaitForSetupUniverseReplicationToFinish( - xcluster::GetAlterReplicationGroupId(replication_group_id), deadline); - if (!task_status.ok()) { - LOG_WITH_FUNC(WARNING) << "Error while waiting for AlterUniverseReplication on " - << replication_group_id << " to complete: " << task_status; - return; - } - LOG_WITH_FUNC(INFO) << "Tables added to namespace-level replication " << universe->id() << " : " - << alter_req.ShortDebugString(); -} - -Status CatalogManager::XClusterNSReplicationSyncWithProducer( - scoped_refptr universe, - std::vector* producer_tables_to_add, - bool* has_non_replicated_consumer_table) { - auto l = universe->LockForRead(); - size_t num_non_matched_consumer_tables = 0; - - // 1. Find producer tables with a name-matching consumer table. - auto xcluster_rpc = - VERIFY_RESULT(universe->GetOrCreateXClusterRpcTasks(l->pb.producer_master_addresses())); - auto producer_namespace = l->pb.producer_namespace(); - auto consumer_namespace = l->pb.consumer_namespace(); - - auto producer_tables = VERIFY_RESULT(XClusterFindProducerConsumerOverlap( - xcluster_rpc, &producer_namespace, &consumer_namespace, &num_non_matched_consumer_tables)); - - // 2. Filter out producer tables that are already in the replication. - for (const auto& tid : producer_tables) { - if (ContainsKey(l->pb.validated_tables(), tid)) { - continue; - } - producer_tables_to_add->push_back(tid); - } - - // 3. If all consumer tables have a name-matching producer tables, and there is no additional - // producer table to add to the replication, this means that all consumer tables are - // currently replicated and we can stop the periodic sync temporarily. - *has_non_replicated_consumer_table = - num_non_matched_consumer_tables > 0 || !producer_tables_to_add->empty(); - - // 4. Finally, verify that all producer tables to be added do not require bootstrapping. - // TODO: Remove this check after NS-level bootstrap is implemented. - if (!producer_tables_to_add->empty()) { - auto bootstrap_required = - VERIFY_RESULT(xcluster_rpc->client()->IsBootstrapRequired(*producer_tables_to_add)); - if (bootstrap_required) { - std::ostringstream ptable_stream; - for (const auto& ptable : *producer_tables_to_add) { - ptable_stream << ptable << ","; - } - std::string ptable_str = ptable_stream.str(); - ptable_str.pop_back(); // Remove the last comma. - return STATUS( - IllegalState, - Format( - "Producer tables [$0] require bootstrapping, which is not currently " - "supported by the namespace-level replication setup.", - ptable_str)); - } - } - return Status::OK(); -} - -Result> CatalogManager::XClusterFindProducerConsumerOverlap( - std::shared_ptr producer_xcluster_rpc, - NamespaceIdentifierPB* producer_namespace, NamespaceIdentifierPB* consumer_namespace, - size_t* num_non_matched_consumer_tables) { - // TODO: Add support for colocated (parent) tables. Currently they are not supported because - // parent colocated tables are system tables and are therefore excluded by ListUserTables. - SCHECK(producer_xcluster_rpc != nullptr, InternalError, "Producer CDC RPC is null"); - - // 1. Find all producer tables. Also record the producer namespace ID. - auto producer_tables = VERIFY_RESULT(producer_xcluster_rpc->client()->ListUserTables( - *producer_namespace, true /* include_indexes */)); - SCHECK( - !producer_tables.empty(), NotFound, - "No producer table found under namespace " + producer_namespace->ShortDebugString()); - - if (!producer_tables.empty()) { - producer_namespace->set_id(producer_tables[0].namespace_id()); - } - - // 2. Find all consumer tables. Only collect the table names as we are doing name matching. - // Also record the consumer namespace ID. - std::unordered_set consumer_tables; - { - ListTablesRequestPB list_req; - ListTablesResponsePB list_resp; - list_req.add_relation_type_filter(USER_TABLE_RELATION); - list_req.add_relation_type_filter(INDEX_TABLE_RELATION); - list_req.mutable_namespace_()->CopyFrom(*consumer_namespace); - - auto s = ListTables(&list_req, &list_resp); - std::ostringstream error_stream; - if (!s.ok() || list_resp.has_error()) { - error_stream << (!s.ok() ? s.ToString() : list_resp.error().status().message()); - } - SCHECK( - list_resp.tables_size() > 0, NotFound, - Format( - "No consumer table found under namespace $0. Error: $1", - consumer_namespace->ShortDebugString(), error_stream.str())); - for (const auto& table : list_resp.tables()) { - auto table_name = Format( - "$0.$1.$2", - table.namespace_().name(), - table.pgschema_name(), // Empty for YCQL tables. - table.name()); - consumer_tables.insert(table_name); - } - consumer_namespace->set_id(list_resp.tables(0).namespace_().id()); - } - - // 3. Find producer tables with a name-matching consumer table. - std::vector overlap_tables; - for (const auto& table : producer_tables) { - auto table_name = Format( - "$0.$1.$2", - table.namespace_name(), - table.pgschema_name(), // Empty for YCQL tables. - table.table_name()); - if (consumer_tables.contains(table_name)) { - overlap_tables.push_back(table.table_id()); - consumer_tables.erase(table_name); - } - } - - // 4. Count the number of consumer tables without a name-matching producer table. - *num_non_matched_consumer_tables = consumer_tables.size(); - - return overlap_tables; -} - Result> CatalogManager::GetTableById(const TableId& table_id) const { return FindTableById(table_id); } diff --git a/src/yb/tools/yb-admin-xcluster-test.cc b/src/yb/tools/yb-admin-xcluster-test.cc index 2a46e49b881d..c2e322e3f277 100644 --- a/src/yb/tools/yb-admin-xcluster-test.cc +++ b/src/yb/tools/yb-admin-xcluster-test.cc @@ -64,7 +64,6 @@ using rpc::RpcController; namespace { const string kFakeUuid = "11111111111111111111111111111111"; -const string kBootstrapArg = "bootstrap"; Result GetRecentStreamId(MiniCluster* cluster, TabletId target_table_id = "") { // Return the first stream with tablet_id matching target_table_id using ListCDCStreams. @@ -463,111 +462,6 @@ TEST_F(XClusterAdminCliTest, TestSetupUniverseReplicationCleanupOnFailure) { ASSERT_OK(RunAdminToolCommand("delete_universe_replication", kProducerClusterId)); } -TEST_F(XClusterAdminCliTest, TestSetupNamespaceReplicationWithBootstrap) { - client::TableHandle producer_cluster_table; - const client::YBTableName kTestTableName(YQL_DATABASE_CQL, "my_keyspace", "test_table"); - - // Create an identical table on the producer. - client::kv_table_test::CreateTable( - Transactional::kTrue, NumTablets(), producer_cluster_client_.get(), &producer_cluster_table, - kTestTableName); - - const auto& producer_namespace = "ycql.my_keyspace"; - // Setup universe replication, this should only return once complete. - ASSERT_OK(RunAdminToolCommand( - "setup_namespace_universe_replication", kProducerClusterId, - producer_cluster_->GetMasterAddresses(), producer_namespace, kBootstrapArg)); - - // Check that the stream was properly created for this table. - ASSERT_OK(CheckTableIsBeingReplicated({producer_cluster_table->id()})); - - // Delete this universe so shutdown can proceed. - ASSERT_OK(RunAdminToolCommand("delete_universe_replication", kProducerClusterId)); -} - -TEST_F(XClusterAdminCliTest, TestSetupNamespaceReplicationWithBootstrapFailTransactionalCQL) { - client::TableHandle producer_cluster_table; - const client::YBTableName kTestTableName(YQL_DATABASE_CQL, "my_keyspace", "test_table"); - - // Create an identical table on the producer. - client::kv_table_test::CreateTable( - Transactional::kTrue, NumTablets(), producer_cluster_client_.get(), &producer_cluster_table, - kTestTableName); - - const auto& producer_namespace = "ycql.my_keyspace"; - ASSERT_NOK(RunAdminToolCommand( - "setup_namespace_universe_replication", kProducerClusterId, - producer_cluster_->GetMasterAddresses(), producer_namespace, kBootstrapArg, "transactional")); -} - -TEST_F(XClusterAdminCliTest, TestSetupNamespaceReplicationWithBootstrapFailInvalidArgumentOrder) { - client::TableHandle producer_cluster_table; - const client::YBTableName kTestTableName(YQL_DATABASE_CQL, "my_keyspace", "test_table"); - - // Create an identical table on the producer. - client::kv_table_test::CreateTable( - Transactional::kTrue, NumTablets(), producer_cluster_client_.get(), &producer_cluster_table, - kTestTableName); - - const auto& producer_namespace = "ycql.my_keyspace"; - - ASSERT_NOK(RunAdminToolCommand( - "setup_namespace_universe_replication", producer_cluster_->GetMasterAddresses(), - kProducerClusterId, producer_namespace, kBootstrapArg)); - - ASSERT_NOK(RunAdminToolCommand( - "setup_namespace_universe_replication", producer_cluster_->GetMasterAddresses(), - producer_namespace, kProducerClusterId, kBootstrapArg)); - - ASSERT_NOK(RunAdminToolCommand( - "setup_namespace_universe_replication", producer_namespace, - producer_cluster_->GetMasterAddresses(), kProducerClusterId, kBootstrapArg)); - - ASSERT_NOK(RunAdminToolCommand( - "setup_namespace_universe_replication", producer_namespace, kProducerClusterId, - producer_cluster_->GetMasterAddresses(), kBootstrapArg)); -} - -TEST_F(XClusterAdminCliTest, TestSetupNamespaceReplicationWithBootstrapFailInvalidNamespace) { - client::TableHandle producer_cluster_table; - const client::YBTableName kTestTableName(YQL_DATABASE_CQL, "my_keyspace", "test_table"); - - // Create an identical table on the producer. - client::kv_table_test::CreateTable( - Transactional::kTrue, NumTablets(), producer_cluster_client_.get(), &producer_cluster_table, - kTestTableName); - - - ASSERT_NOK(RunAdminToolCommand( - "setup_namespace_universe_replication", kProducerClusterId, - producer_cluster_->GetMasterAddresses(), "my_keyspace.ycql", kBootstrapArg)); - - ASSERT_NOK(RunAdminToolCommand( - "setup_namespace_universe_replication", kProducerClusterId, - producer_cluster_->GetMasterAddresses(), "my_keyspace.ysql", kBootstrapArg)); -} - -TEST_F(XClusterAdminCliTest, TestSetupNamespaceReplicationWithBootstrapFailInvalidNumArgs) { - client::TableHandle producer_cluster_table; - const client::YBTableName kTestTableName(YQL_DATABASE_CQL, "my_keyspace", "test_table"); - - // Create an identical table on the producer. - client::kv_table_test::CreateTable( - Transactional::kTrue, NumTablets(), producer_cluster_client_.get(), &producer_cluster_table, - kTestTableName); - - ASSERT_NOK(RunAdminToolCommand("setup_namespace_universe_replication", kProducerClusterId)); - - ASSERT_NOK(RunAdminToolCommand( - "setup_namespace_universe_replication", kProducerClusterId, - producer_cluster_->GetMasterAddresses())); - - ASSERT_NOK(RunAdminToolCommand( - "setup_namespace_universe_replication", kProducerClusterId, - producer_cluster_->GetMasterAddresses(), "my_keyspace.ysql", kBootstrapArg, kBootstrapArg, - kBootstrapArg)); -} - TEST_F(XClusterAdminCliTest, TestListCdcStreamsWithBootstrappedStreams) { const int kStreamUuidLength = 32; client::TableHandle producer_cluster_table; diff --git a/src/yb/tools/yb-admin_cli.cc b/src/yb/tools/yb-admin_cli.cc index d54106c19b5b..348a408c8250 100644 --- a/src/yb/tools/yb-admin_cli.cc +++ b/src/yb/tools/yb-admin_cli.cc @@ -2215,57 +2215,6 @@ Status wait_for_replication_drain_action( return client->WaitForReplicationDrain(stream_ids, target_time); } -const auto setup_namespace_universe_replication_args = - " [bootstrap] [transactional]"; -Status setup_namespace_universe_replication_action( - const ClusterAdminCli::CLIArguments& args, ClusterAdminClient* client) { - RETURN_NOT_OK(CheckArgumentsCount(args.size(), 3, 5)); - const string replication_group_id = args[0]; - vector producer_addresses; - boost::split(producer_addresses, args[1], boost::is_any_of(",")); - TypedNamespaceName producer_namespace = VERIFY_RESULT(ParseNamespaceName(args[2])); - - bool bootstrap = false; - bool transactional = false; - if (args.size() > 3) { - switch (args.size()) { - case 4: - if (IsEqCaseInsensitive(args[3], "bootstrap")) { - bootstrap = true; - } else if (IsEqCaseInsensitive(args[3], "transactional")) { - transactional = true; - } - break; - case 5: { - if (IsEqCaseInsensitive(args[3], "bootstrap") && - IsEqCaseInsensitive(args[4], "transactional")) { - transactional = true; - bootstrap = true; - } else { - return ClusterAdminCli::kInvalidArguments; - } - break; - } - default: - return ClusterAdminCli::kInvalidArguments; - } - } - - if (bootstrap) { - RETURN_NOT_OK_PREPEND( - client->SetupNamespaceReplicationWithBootstrap( - replication_group_id, producer_addresses, producer_namespace, transactional), - Format("Unable to setup replication from universe $0", replication_group_id)); - } else { - RETURN_NOT_OK_PREPEND( - client->SetupNSUniverseReplication( - replication_group_id, producer_addresses, producer_namespace), - Format("Unable to setup namespace replication from universe $0", replication_group_id)); - } - - return Status::OK(); -} - const auto get_replication_status_args = "[]"; Status get_replication_status_action( const ClusterAdminCli::CLIArguments& args, ClusterAdminClient* client) { @@ -2788,7 +2737,6 @@ void ClusterAdminCli::RegisterCommandHandlers() { REGISTER_COMMAND(setup_universe_replication); REGISTER_COMMAND(delete_universe_replication); REGISTER_COMMAND(alter_universe_replication); - REGISTER_COMMAND(setup_namespace_universe_replication); REGISTER_COMMAND(set_universe_replication_enabled); REGISTER_COMMAND(get_replication_status); REGISTER_COMMAND(get_xcluster_safe_time); diff --git a/src/yb/tools/yb-admin_client.cc b/src/yb/tools/yb-admin_client.cc index 9796310c6a44..5862be7ec695 100644 --- a/src/yb/tools/yb-admin_client.cc +++ b/src/yb/tools/yb-admin_client.cc @@ -4290,46 +4290,6 @@ Status ClusterAdminClient::WaitForReplicationDrain( return Status::OK(); } -Status ClusterAdminClient::SetupNSUniverseReplication( - const std::string& replication_group_id, - const std::vector& producer_addresses, - const TypedNamespaceName& producer_namespace) { - switch (producer_namespace.db_type) { - case YQL_DATABASE_CQL: - break; - case YQL_DATABASE_PGSQL: - return STATUS( - InvalidArgument, "YSQL not currently supported for namespace-level replication setup"); - default: - return STATUS(InvalidArgument, "Unsupported namespace type"); - } - - master::SetupNSUniverseReplicationRequestPB req; - master::SetupNSUniverseReplicationResponsePB resp; - req.set_replication_group_id(replication_group_id); - req.set_producer_ns_name(producer_namespace.name); - req.set_producer_ns_type(producer_namespace.db_type); - - req.mutable_producer_master_addresses()->Reserve(narrow_cast(producer_addresses.size())); - for (const auto& addr : producer_addresses) { - auto hp = VERIFY_RESULT(HostPort::FromString(addr, master::kMasterDefaultPort)); - HostPortToPB(hp, req.add_producer_master_addresses()); - } - - RpcController rpc; - rpc.set_timeout(timeout_); - RETURN_NOT_OK(master_replication_proxy_->SetupNSUniverseReplication(req, &resp, &rpc)); - - if (resp.has_error()) { - cout << "Error setting up namespace-level universe replication: " - << resp.error().status().message() << endl; - return StatusFromPB(resp.error().status()); - } - - cout << "Namespace-level replication setup successfully" << endl; - return Status::OK(); -} - Status ClusterAdminClient::GetReplicationInfo(const std::string& replication_group_id) { master::GetReplicationStatusRequestPB req; master::GetReplicationStatusResponsePB resp; diff --git a/src/yb/tools/yb-admin_client.h b/src/yb/tools/yb-admin_client.h index a8088b91a29d..e41b57c6cec8 100644 --- a/src/yb/tools/yb-admin_client.h +++ b/src/yb/tools/yb-admin_client.h @@ -454,10 +454,6 @@ class ClusterAdminClient { Status WaitForReplicationDrain( const std::vector& stream_ids, const std::string& target_time); - Status SetupNSUniverseReplication(const std::string& replication_group_id, - const std::vector& producer_addresses, - const TypedNamespaceName& producer_namespace); - Status GetReplicationInfo(const std::string& replication_group_id); Result GetXClusterSafeTime(bool include_lag_and_skew = false);