Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster manager: avoid immediate activation for dynamic inserted cluster when initialize #12783

Merged
merged 14 commits into from
Oct 16, 2020
68 changes: 34 additions & 34 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,16 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) {
// been setup for cross-thread updates to avoid needless updates during initialization. The order
// of operations here is important. We start by initializing the thread aware load balancer if
// needed. This must happen first so cluster updates are heard first by the load balancer.
auto cluster_data = active_clusters_.find(cluster.info()->name());
// Also, it assures that all of clusters which this function is called should be always active.
auto cluster_data = warming_clusters_.find(cluster.info()->name());
// We have a situation that clusters will be immediately active, such as static and primary
// cluster. So we must have this prevention logic here.
if (cluster_data != warming_clusters_.end()) {
clusterWarmingToActive(cluster.info()->name());
updateClusterCounts();
}
cluster_data = active_clusters_.find(cluster.info()->name());

if (cluster_data->second->thread_aware_lb_ != nullptr) {
cluster_data->second->thread_aware_lb_->initialize();
}
Expand Down Expand Up @@ -587,17 +596,6 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cl
// The following init manager remove call is a NOP in the case we are already initialized.
// It's just kept here to avoid additional logic.
init_helper_.removeCluster(*existing_active_cluster->second->cluster_);
} else {
// Validate that warming clusters are not added to the init_helper_.
// NOTE: This loop is compiled out in optimized builds.
for (const std::list<Cluster*>& cluster_list :
{std::cref(init_helper_.primary_init_clusters_),
std::cref(init_helper_.secondary_init_clusters_)}) {
ASSERT(!std::any_of(cluster_list.begin(), cluster_list.end(),
[&existing_warming_cluster](Cluster* cluster) {
return existing_warming_cluster->second->cluster_.get() == cluster;
}));
}
}
cm_stats_.cluster_modified_.inc();
} else {
Expand All @@ -614,40 +612,39 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cl
// the future we may decide to undergo a refactor to unify the logic but the effort/risk to
// do that right now does not seem worth it given that the logic is generally pretty clean
// and easy to understand.
const bool use_active_map =
init_helper_.state() != ClusterManagerInitHelper::State::AllClustersInitialized;
loadCluster(cluster, version_info, true, use_active_map ? active_clusters_ : warming_clusters_);

if (use_active_map) {
const bool all_clusters_initialized =
init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized;
loadCluster(cluster, version_info, true, warming_clusters_);
auto& cluster_entry = warming_clusters_.at(cluster_name);
if (!all_clusters_initialized) {
ENVOY_LOG(debug, "add/update cluster {} during init", cluster_name);
auto& cluster_entry = active_clusters_.at(cluster_name);
createOrUpdateThreadLocalCluster(*cluster_entry);
init_helper_.addCluster(*cluster_entry->cluster_);
} else {
auto& cluster_entry = warming_clusters_.at(cluster_name);
ENVOY_LOG(debug, "add/update cluster {} starting warming", cluster_name);
cluster_entry->cluster_->initialize([this, cluster_name] {
auto warming_it = warming_clusters_.find(cluster_name);
auto& cluster_entry = *warming_it->second;

// If the cluster is being updated, we need to cancel any pending merged updates.
// Otherwise, applyUpdates() will fire with a dangling cluster reference.
updates_map_.erase(cluster_name);

active_clusters_[cluster_name] = std::move(warming_it->second);
warming_clusters_.erase(warming_it);

ENVOY_LOG(debug, "warming cluster {} complete", cluster_name);
createOrUpdateThreadLocalCluster(cluster_entry);
onClusterInit(*cluster_entry.cluster_);
updateClusterCounts();
auto state_changed_cluster_entry = warming_clusters_.find(cluster_name);
createOrUpdateThreadLocalCluster(*state_changed_cluster_entry->second);
onClusterInit(*state_changed_cluster_entry->second->cluster_);
});
}

updateClusterCounts();
return true;
}

void ClusterManagerImpl::clusterWarmingToActive(const std::string& cluster_name) {
auto warming_it = warming_clusters_.find(cluster_name);
ASSERT(warming_it != warming_clusters_.end());

// If the cluster is being updated, we need to cancel any pending merged updates.
// Otherwise, applyUpdates() will fire with a dangling cluster reference.
updates_map_.erase(cluster_name);

active_clusters_[cluster_name] = std::move(warming_it->second);
warming_clusters_.erase(warming_it);
}

void ClusterManagerImpl::createOrUpdateThreadLocalCluster(ClusterData& cluster) {
tls_->runOnAllThreads([new_cluster = cluster.cluster_->info(),
thread_aware_lb_factory = cluster.loadBalancerFactory()](
Expand Down Expand Up @@ -702,6 +699,7 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) {
if (existing_warming_cluster != warming_clusters_.end() &&
existing_warming_cluster->second->added_via_api_) {
removed = true;
init_helper_.removeCluster(*existing_warming_cluster->second->cluster_);
warming_clusters_.erase(existing_warming_cluster);
ENVOY_LOG(info, "removing warming cluster {}", cluster_name);
}
Expand Down Expand Up @@ -804,7 +802,9 @@ void ClusterManagerImpl::updateClusterCounts() {
// Once cluster is warmed up, CDS is resumed, and ACK is sent to ADS, providing a
// signal to ADS to proceed with RDS updates.
// If we're in the middle of shutting down (ads_mux_ already gone) then this is irrelevant.
if (ads_mux_) {
const bool all_clusters_initialized =
init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized;
if (all_clusters_initialized && ads_mux_) {
const auto type_urls = Config::getAllVersionTypeUrls<envoy::config::cluster::v3::Cluster>();
const uint64_t previous_warming = cm_stats_.warming_clusters_.value();
if (previous_warming == 0 && !warming_clusters_.empty()) {
Expand Down
1 change: 1 addition & 0 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -482,6 +482,7 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
void onClusterInit(Cluster& cluster);
void postThreadLocalHealthFailure(const HostSharedPtr& host);
void updateClusterCounts();
void clusterWarmingToActive(const std::string& cluster_name);
void maybePrefetch(ThreadLocalClusterManagerImpl::ClusterEntryPtr& cluster_entry,
std::function<ConnectionPool::Instance*()> prefetch_pool);

Expand Down
4 changes: 2 additions & 2 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ TEST_F(ClusterManagerImplTest, InitializeOrder) {
last_updated:
seconds: 1234567891
nanos: 234000000
dynamic_active_clusters:
dynamic_warming_clusters:
- version_info: "version1"
cluster:
"@type": type.googleapis.com/envoy.config.cluster.v3.Cluster
Expand Down Expand Up @@ -1107,7 +1107,7 @@ TEST_F(ClusterManagerImplTest, InitializeOrder) {
last_updated:
seconds: 1234567891
nanos: 234000000
dynamic_warming_clusters:
dynamic_active_clusters:
)EOF");

EXPECT_CALL(*cluster3, initialize(_));
Expand Down
20 changes: 20 additions & 0 deletions test/integration/ads_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1132,6 +1132,26 @@ class AdsClusterV3Test : public AdsIntegrationTest {
INSTANTIATE_TEST_SUITE_P(IpVersionsClientTypeDelta, AdsClusterV3Test,
DELTA_SOTW_GRPC_CLIENT_INTEGRATION_PARAMS);

TEST_P(AdsClusterV3Test, BasicClusterInitialWarming) {
initialize();
const auto cds_type_url = Config::getTypeUrl<envoy::config::cluster::v3::Cluster>(
envoy::config::core::v3::ApiVersion::V3);
const auto eds_type_url = Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>(
envoy::config::core::v3::ApiVersion::V3);

EXPECT_TRUE(compareDiscoveryRequest(cds_type_url, "", {}, {}, {}, true));
sendDiscoveryResponse<envoy::config::cluster::v3::Cluster>(
cds_type_url, {buildCluster("cluster_0")}, {buildCluster("cluster_0")}, {}, "1", false);
test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1);
EXPECT_TRUE(compareDiscoveryRequest(eds_type_url, "", {"cluster_0"}, {"cluster_0"}, {}));
sendDiscoveryResponse<envoy::config::endpoint::v3::ClusterLoadAssignment>(
eds_type_url, {buildClusterLoadAssignment("cluster_0")},
{buildClusterLoadAssignment("cluster_0")}, {}, "1", false);

test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not a great test since you can't actually tell that the cluster was ever warming. Can you check that it's warming (don't respond to EDS), and then response to EDS and make sure it's active?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. like this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite. Verify that the warming gauge is 1 before you send the EDS response.

/wait

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2);
}

// Verify CDS is paused during cluster warming.
TEST_P(AdsClusterV3Test, CdsPausedDuringWarming) {
initialize();
Expand Down