Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster manager: avoid immediate activation for dynamic inserted cluster when initialize #12783

Merged
merged 14 commits into from
Oct 16, 2020
54 changes: 31 additions & 23 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,13 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) {
// been setup for cross-thread updates to avoid needless updates during initialization. The order
// of operations here is important. We start by initializing the thread aware load balancer if
// needed. This must happen first so cluster updates are heard first by the load balancer.
auto cluster_data = active_clusters_.find(cluster.info()->name());
auto cluster_data = warming_clusters_.find(cluster.info()->name());
if (cluster_data != warming_clusters_.end()) {
clusterWarmingToActive(cluster.info()->name());
updateClusterCounts();
}
cluster_data = active_clusters_.find(cluster.info()->name());

if (cluster_data->second->thread_aware_lb_ != nullptr) {
cluster_data->second->thread_aware_lb_->initialize();
}
Expand Down Expand Up @@ -613,40 +619,39 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cl
// the future we may decide to undergo a refactor to unify the logic but the effort/risk to
// do that right now does not seem worth it given that the logic is generally pretty clean
// and easy to understand.
const bool use_active_map =
init_helper_.state() != ClusterManagerInitHelper::State::AllClustersInitialized;
loadCluster(cluster, version_info, true, use_active_map ? active_clusters_ : warming_clusters_);

if (use_active_map) {
const bool all_clusters_initialized =
init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized;
loadCluster(cluster, version_info, true, warming_clusters_);
auto& cluster_entry = warming_clusters_.at(cluster_name);
if (!all_clusters_initialized) {
ENVOY_LOG(debug, "add/update cluster {} during init", cluster_name);
auto& cluster_entry = active_clusters_.at(cluster_name);
createOrUpdateThreadLocalCluster(*cluster_entry);
init_helper_.addCluster(*cluster_entry->cluster_);
} else {
auto& cluster_entry = warming_clusters_.at(cluster_name);
ENVOY_LOG(debug, "add/update cluster {} starting warming", cluster_name);
cluster_entry->cluster_->initialize([this, cluster_name] {
auto warming_it = warming_clusters_.find(cluster_name);
auto& cluster_entry = *warming_it->second;

// If the cluster is being updated, we need to cancel any pending merged updates.
// Otherwise, applyUpdates() will fire with a dangling cluster reference.
updates_map_.erase(cluster_name);

active_clusters_[cluster_name] = std::move(warming_it->second);
warming_clusters_.erase(warming_it);

ENVOY_LOG(debug, "warming cluster {} complete", cluster_name);
createOrUpdateThreadLocalCluster(cluster_entry);
onClusterInit(*cluster_entry.cluster_);
updateClusterCounts();
auto state_changed_cluster_entry = warming_clusters_.find(cluster_name);
createOrUpdateThreadLocalCluster(*state_changed_cluster_entry->second);
onClusterInit(*state_changed_cluster_entry->second->cluster_);
});
}

updateClusterCounts();
return true;
}

void ClusterManagerImpl::clusterWarmingToActive(const std::string& cluster_name) {
auto warming_it = warming_clusters_.find(cluster_name);
ASSERT(warming_it != warming_clusters_.end());

// If the cluster is being updated, we need to cancel any pending merged updates.
// Otherwise, applyUpdates() will fire with a dangling cluster reference.
updates_map_.erase(cluster_name);

active_clusters_[cluster_name] = std::move(warming_it->second);
warming_clusters_.erase(warming_it);
}

void ClusterManagerImpl::createOrUpdateThreadLocalCluster(ClusterData& cluster) {
tls_->runOnAllThreads([this, new_cluster = cluster.cluster_->info(),
thread_aware_lb_factory = cluster.loadBalancerFactory()]() -> void {
Expand Down Expand Up @@ -695,6 +700,7 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) {
if (existing_warming_cluster != warming_clusters_.end() &&
existing_warming_cluster->second->added_via_api_) {
removed = true;
init_helper_.removeCluster(*existing_warming_cluster->second->cluster_);
warming_clusters_.erase(existing_warming_cluster);
ENVOY_LOG(info, "removing warming cluster {}", cluster_name);
}
Expand Down Expand Up @@ -797,7 +803,9 @@ void ClusterManagerImpl::updateClusterCounts() {
// Once cluster is warmed up, CDS is resumed, and ACK is sent to ADS, providing a
// signal to ADS to proceed with RDS updates.
// If we're in the middle of shutting down (ads_mux_ already gone) then this is irrelevant.
if (ads_mux_) {
const bool all_clusters_initialized =
init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized;
if (all_clusters_initialized && ads_mux_) {
const auto type_urls = Config::getAllVersionTypeUrls<envoy::config::cluster::v3::Cluster>();
const uint64_t previous_warming = cm_stats_.warming_clusters_.value();
if (previous_warming == 0 && !warming_clusters_.empty()) {
Expand Down
1 change: 1 addition & 0 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,7 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
void onClusterInit(Cluster& cluster);
void postThreadLocalHealthFailure(const HostSharedPtr& host);
void updateClusterCounts();
void clusterWarmingToActive(const std::string& cluster_name);
void maybePrefetch(ThreadLocalClusterManagerImpl::ClusterEntryPtr& cluster_entry,
std::function<ConnectionPool::Instance*()> prefetch_pool);

Expand Down
4 changes: 2 additions & 2 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1056,7 +1056,7 @@ TEST_F(ClusterManagerImplTest, InitializeOrder) {
last_updated:
seconds: 1234567891
nanos: 234000000
dynamic_active_clusters:
dynamic_warming_clusters:
- version_info: "version1"
cluster:
"@type": type.googleapis.com/envoy.config.cluster.v3.Cluster
Expand Down Expand Up @@ -1108,7 +1108,7 @@ TEST_F(ClusterManagerImplTest, InitializeOrder) {
last_updated:
seconds: 1234567891
nanos: 234000000
dynamic_warming_clusters:
dynamic_active_clusters:
)EOF");

EXPECT_CALL(*cluster3, initialize(_));
Expand Down