Skip to content

Commit

Permalink
[1.8] fix cluster early warm up (#285)
Browse files Browse the repository at this point in the history
* cluster manager: avoid immediate activation for dynamic inserted cluster when initialize (envoyproxy#12783)

Signed-off-by: Shikugawa <[email protected]>
Signed-off-by: Yuchen Dai <[email protected]>

* cluster: unstuck cluster manager when update the initializing cluster (envoyproxy#13875)

Signed-off-by: Yuchen Dai <[email protected]>

Co-authored-by: Rei Shimizu <[email protected]>
  • Loading branch information
lambdai and Shikugawa authored Nov 11, 2020
1 parent a490de8 commit cd87e82
Show file tree
Hide file tree
Showing 7 changed files with 366 additions and 71 deletions.
107 changes: 65 additions & 42 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,20 @@ void ClusterManagerInitHelper::addCluster(Cluster& cluster) {

const auto initialize_cb = [&cluster, this] { onClusterInit(cluster); };
if (cluster.initializePhase() == Cluster::InitializePhase::Primary) {
// Remove the previous cluster before the cluster object is destroyed.
primary_init_clusters_.remove_if(
[name_to_remove = cluster.info()->name()](Cluster* cluster_iter) {
return cluster_iter->info()->name() == name_to_remove;
});
primary_init_clusters_.push_back(&cluster);
cluster.initialize(initialize_cb);
} else {
ASSERT(cluster.initializePhase() == Cluster::InitializePhase::Secondary);
// Remove the previous cluster before the cluster object is destroyed.
secondary_init_clusters_.remove_if(
[name_to_remove = cluster.info()->name()](Cluster* cluster_iter) {
return cluster_iter->info()->name() == name_to_remove;
});
secondary_init_clusters_.push_back(&cluster);
if (started_secondary_initialize_) {
// This can happen if we get a second CDS update that adds new clusters after we have
Expand Down Expand Up @@ -417,7 +427,16 @@ void ClusterManagerImpl::onClusterInit(Cluster& cluster) {
// been setup for cross-thread updates to avoid needless updates during initialization. The order
// of operations here is important. We start by initializing the thread aware load balancer if
// needed. This must happen first so cluster updates are heard first by the load balancer.
auto cluster_data = active_clusters_.find(cluster.info()->name());
// Also, it assures that all of clusters which this function is called should be always active.
auto cluster_data = warming_clusters_.find(cluster.info()->name());
// We have a situation that clusters will be immediately active, such as static and primary
// cluster. So we must have this prevention logic here.
if (cluster_data != warming_clusters_.end()) {
clusterWarmingToActive(cluster.info()->name());
updateClusterCounts();
}
cluster_data = active_clusters_.find(cluster.info()->name());

if (cluster_data->second->thread_aware_lb_ != nullptr) {
cluster_data->second->thread_aware_lb_->initialize();
}
Expand Down Expand Up @@ -587,17 +606,6 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cl
// The following init manager remove call is a NOP in the case we are already initialized.
// It's just kept here to avoid additional logic.
init_helper_.removeCluster(*existing_active_cluster->second->cluster_);
} else {
// Validate that warming clusters are not added to the init_helper_.
// NOTE: This loop is compiled out in optimized builds.
for (const std::list<Cluster*>& cluster_list :
{std::cref(init_helper_.primary_init_clusters_),
std::cref(init_helper_.secondary_init_clusters_)}) {
ASSERT(!std::any_of(cluster_list.begin(), cluster_list.end(),
[&existing_warming_cluster](Cluster* cluster) {
return existing_warming_cluster->second->cluster_.get() == cluster;
}));
}
}
cm_stats_.cluster_modified_.inc();
} else {
Expand All @@ -614,40 +622,41 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cl
// the future we may decide to undergo a refactor to unify the logic but the effort/risk to
// do that right now does not seem worth it given that the logic is generally pretty clean
// and easy to understand.
const bool use_active_map =
init_helper_.state() != ClusterManagerInitHelper::State::AllClustersInitialized;
loadCluster(cluster, version_info, true, use_active_map ? active_clusters_ : warming_clusters_);

if (use_active_map) {
const bool all_clusters_initialized =
init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized;
// Preserve the previous cluster data to avoid early destroy. The same cluster should be added
// before destroy to avoid early initialization complete.
const auto previous_cluster = loadCluster(cluster, version_info, true, warming_clusters_);
auto& cluster_entry = warming_clusters_.at(cluster_name);
if (!all_clusters_initialized) {
ENVOY_LOG(debug, "add/update cluster {} during init", cluster_name);
auto& cluster_entry = active_clusters_.at(cluster_name);
createOrUpdateThreadLocalCluster(*cluster_entry);
init_helper_.addCluster(*cluster_entry->cluster_);
} else {
auto& cluster_entry = warming_clusters_.at(cluster_name);
ENVOY_LOG(debug, "add/update cluster {} starting warming", cluster_name);
cluster_entry->cluster_->initialize([this, cluster_name] {
auto warming_it = warming_clusters_.find(cluster_name);
auto& cluster_entry = *warming_it->second;

// If the cluster is being updated, we need to cancel any pending merged updates.
// Otherwise, applyUpdates() will fire with a dangling cluster reference.
updates_map_.erase(cluster_name);

active_clusters_[cluster_name] = std::move(warming_it->second);
warming_clusters_.erase(warming_it);

ENVOY_LOG(debug, "warming cluster {} complete", cluster_name);
createOrUpdateThreadLocalCluster(cluster_entry);
onClusterInit(*cluster_entry.cluster_);
updateClusterCounts();
auto state_changed_cluster_entry = warming_clusters_.find(cluster_name);
createOrUpdateThreadLocalCluster(*state_changed_cluster_entry->second);
onClusterInit(*state_changed_cluster_entry->second->cluster_);
});
}

updateClusterCounts();
return true;
}

void ClusterManagerImpl::clusterWarmingToActive(const std::string& cluster_name) {
auto warming_it = warming_clusters_.find(cluster_name);
ASSERT(warming_it != warming_clusters_.end());

// If the cluster is being updated, we need to cancel any pending merged updates.
// Otherwise, applyUpdates() will fire with a dangling cluster reference.
updates_map_.erase(cluster_name);

active_clusters_[cluster_name] = std::move(warming_it->second);
warming_clusters_.erase(warming_it);
}

void ClusterManagerImpl::createOrUpdateThreadLocalCluster(ClusterData& cluster) {
tls_->runOnAllThreads([new_cluster = cluster.cluster_->info(),
thread_aware_lb_factory = cluster.loadBalancerFactory()](
Expand Down Expand Up @@ -702,6 +711,7 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) {
if (existing_warming_cluster != warming_clusters_.end() &&
existing_warming_cluster->second->added_via_api_) {
removed = true;
init_helper_.removeCluster(*existing_warming_cluster->second->cluster_);
warming_clusters_.erase(existing_warming_cluster);
ENVOY_LOG(info, "removing warming cluster {}", cluster_name);
}
Expand All @@ -716,9 +726,10 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) {
return removed;
}

void ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& cluster,
const std::string& version_info, bool added_via_api,
ClusterMap& cluster_map) {
ClusterManagerImpl::ClusterDataPtr
ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& cluster,
const std::string& version_info, bool added_via_api,
ClusterMap& cluster_map) {
std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr> new_cluster_pair =
factory_.clusterFromProto(cluster, *this, outlier_event_logger_, added_via_api);
auto& new_cluster = new_cluster_pair.first;
Expand Down Expand Up @@ -763,11 +774,20 @@ void ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster&
}
});
}

cluster_map[cluster_reference.info()->name()] = std::make_unique<ClusterData>(
cluster, version_info, added_via_api, std::move(new_cluster), time_source_);
const auto cluster_entry_it = cluster_map.find(cluster_reference.info()->name());

ClusterDataPtr result;
auto cluster_entry_it = cluster_map.find(cluster_reference.info()->name());
if (cluster_entry_it != cluster_map.end()) {
result = std::exchange(cluster_entry_it->second,
std::make_unique<ClusterData>(cluster, version_info, added_via_api,
std::move(new_cluster), time_source_));
} else {
bool inserted = false;
std::tie(cluster_entry_it, inserted) =
cluster_map.emplace(cluster_reference.info()->name(),
std::make_unique<ClusterData>(cluster, version_info, added_via_api,
std::move(new_cluster), time_source_));
ASSERT(inserted);
}
// If an LB is thread aware, create it here. The LB is not initialized until cluster pre-init
// finishes. For RingHash/Maglev don't create the LB here if subset balancing is enabled,
// because the thread_aware_lb_ field takes precedence over the subset lb).
Expand All @@ -790,6 +810,7 @@ void ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster&
}

updateClusterCounts();
return result;
}

void ClusterManagerImpl::updateClusterCounts() {
Expand All @@ -804,7 +825,9 @@ void ClusterManagerImpl::updateClusterCounts() {
// Once cluster is warmed up, CDS is resumed, and ACK is sent to ADS, providing a
// signal to ADS to proceed with RDS updates.
// If we're in the middle of shutting down (ads_mux_ already gone) then this is irrelevant.
if (ads_mux_) {
const bool all_clusters_initialized =
init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized;
if (all_clusters_initialized && ads_mux_) {
const auto type_urls = Config::getAllVersionTypeUrls<envoy::config::cluster::v3::Cluster>();
const uint64_t previous_warming = cm_stats_.warming_clusters_.value();
if (previous_warming == 0 && !warming_clusters_.empty()) {
Expand Down
11 changes: 9 additions & 2 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -477,11 +477,18 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
void createOrUpdateThreadLocalCluster(ClusterData& cluster);
ProtobufTypes::MessagePtr dumpClusterConfigs();
static ClusterManagerStats generateStats(Stats::Scope& scope);
void loadCluster(const envoy::config::cluster::v3::Cluster& cluster,
const std::string& version_info, bool added_via_api, ClusterMap& cluster_map);

/**
* @return ClusterDataPtr contains the previous cluster in the cluster_map, or
* nullptr if cluster_map did not contain the same cluster.
*/
ClusterDataPtr loadCluster(const envoy::config::cluster::v3::Cluster& cluster,
const std::string& version_info, bool added_via_api,
ClusterMap& cluster_map);
void onClusterInit(Cluster& cluster);
void postThreadLocalHealthFailure(const HostSharedPtr& host);
void updateClusterCounts();
void clusterWarmingToActive(const std::string& cluster_name);
void maybePrefetch(ThreadLocalClusterManagerImpl::ClusterEntryPtr& cluster_entry,
std::function<ConnectionPool::Instance*()> prefetch_pool);

Expand Down
133 changes: 131 additions & 2 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1055,7 +1055,7 @@ TEST_F(ClusterManagerImplTest, InitializeOrder) {
last_updated:
seconds: 1234567891
nanos: 234000000
dynamic_active_clusters:
dynamic_warming_clusters:
- version_info: "version1"
cluster:
"@type": type.googleapis.com/envoy.config.cluster.v3.Cluster
Expand Down Expand Up @@ -1107,7 +1107,7 @@ TEST_F(ClusterManagerImplTest, InitializeOrder) {
last_updated:
seconds: 1234567891
nanos: 234000000
dynamic_warming_clusters:
dynamic_active_clusters:
)EOF");

EXPECT_CALL(*cluster3, initialize(_));
Expand Down Expand Up @@ -1233,6 +1233,105 @@ TEST_F(ClusterManagerImplTest, RemoveWarmingCluster) {
EXPECT_TRUE(Mock::VerifyAndClearExpectations(cluster1.get()));
}

TEST_F(ClusterManagerImplTest, TestModifyWarmingClusterDuringInitialization) {
const std::string json = fmt::sprintf(
R"EOF(
{
"dynamic_resources": {
"cds_config": {
"api_config_source": {
"api_type": "0",
"refresh_delay": "30s",
"cluster_names": ["cds_cluster"]
}
}
},
"static_resources": {
%s
}
}
)EOF",
clustersJson({
defaultStaticClusterJson("cds_cluster"),
}));

MockCdsApi* cds = new MockCdsApi();
std::shared_ptr<MockClusterMockPrioritySet> cds_cluster(
new NiceMock<MockClusterMockPrioritySet>());
cds_cluster->info_->name_ = "cds_cluster";

// This part tests static init.
InSequence s;
EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _))
.WillOnce(Return(std::make_pair(cds_cluster, nullptr)));
ON_CALL(*cds_cluster, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary));
EXPECT_CALL(factory_, createCds_()).WillOnce(Return(cds));
EXPECT_CALL(*cds, setInitializedCb(_));
EXPECT_CALL(*cds_cluster, initialize(_));

create(parseBootstrapFromV3Json(json));

ReadyWatcher cm_initialized;
cluster_manager_->setInitializedCb([&]() -> void { cm_initialized.ready(); });

const std::string ready_cluster_yaml = R"EOF(
name: fake_cluster
connect_timeout: 0.250s
type: STATIC
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: fake_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 127.0.0.1
port_value: 11001
)EOF";

const std::string warming_cluster_yaml = R"EOF(
name: fake_cluster
connect_timeout: 0.250s
type: STRICT_DNS
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: fake_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: foo.com
port_value: 11001
)EOF";

{
SCOPED_TRACE("Add a primary cluster staying in warming.");
EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _));
EXPECT_TRUE(cluster_manager_->addOrUpdateCluster(parseClusterFromV3Yaml(warming_cluster_yaml),
"warming"));

// Mark all the rest of the clusters ready. Now the only warming cluster is the above one.
EXPECT_CALL(cm_initialized, ready()).Times(0);
cds_cluster->initialize_callback_();
}

{
SCOPED_TRACE("Modify the only warming primary cluster to immediate ready.");
EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _));
EXPECT_CALL(*cds, initialize());
EXPECT_TRUE(
cluster_manager_->addOrUpdateCluster(parseClusterFromV3Yaml(ready_cluster_yaml), "ready"));
}
{
SCOPED_TRACE("All clusters are ready.");
EXPECT_CALL(cm_initialized, ready());
cds->initialized_callback_();
}
EXPECT_TRUE(Mock::VerifyAndClearExpectations(cds_cluster.get()));
}

TEST_F(ClusterManagerImplTest, ModifyWarmingCluster) {
time_system_.setSystemTime(std::chrono::milliseconds(1234567891234));
create(defaultConfig());
Expand Down Expand Up @@ -2984,6 +3083,33 @@ TEST_F(ClusterManagerInitHelperTest, StaticSdsInitialize) {
cluster1.initialize_callback_();
}

// Verify that primary cluster can be updated in warming state.
TEST_F(ClusterManagerInitHelperTest, TestUpdateWarming) {
InSequence s;

auto sds = std::make_unique<NiceMock<MockClusterMockPrioritySet>>();
ON_CALL(*sds, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary));
EXPECT_CALL(*sds, initialize(_));
init_helper_.addCluster(*sds);
init_helper_.onStaticLoadComplete();

NiceMock<MockClusterMockPrioritySet> updated_sds;
ON_CALL(updated_sds, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary));
EXPECT_CALL(updated_sds, initialize(_));
init_helper_.addCluster(updated_sds);

// The override cluster is added. Manually drop the previous cluster. In production flow this is
// achieved by ClusterManagerImpl.
sds.reset();

ReadyWatcher primary_initialized;
init_helper_.setPrimaryClustersInitializedCb([&]() -> void { primary_initialized.ready(); });

EXPECT_CALL(*this, onClusterInit(Ref(updated_sds)));
EXPECT_CALL(primary_initialized, ready());
updated_sds.initialize_callback_();
}

TEST_F(ClusterManagerInitHelperTest, UpdateAlreadyInitialized) {
InSequence s;

Expand Down Expand Up @@ -3087,6 +3213,7 @@ TEST_F(ClusterManagerInitHelperTest, AddSecondaryAfterSecondaryInit) {
init_helper_.addCluster(cluster1);

NiceMock<MockClusterMockPrioritySet> cluster2;
cluster2.info_->name_ = "cluster2";
ON_CALL(cluster2, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary));
init_helper_.addCluster(cluster2);

Expand All @@ -3099,6 +3226,8 @@ TEST_F(ClusterManagerInitHelperTest, AddSecondaryAfterSecondaryInit) {
init_helper_.startInitializingSecondaryClusters();

NiceMock<MockClusterMockPrioritySet> cluster3;
cluster3.info_->name_ = "cluster3";

ON_CALL(cluster3, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary));
EXPECT_CALL(cluster3, initialize(_));
init_helper_.addCluster(cluster3);
Expand Down
5 changes: 3 additions & 2 deletions test/integration/ads_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ AdsIntegrationTest::AdsIntegrationTest(const envoy::config::core::v3::ApiVersion

void AdsIntegrationTest::TearDown() { cleanUpXdsConnection(); }

envoy::config::cluster::v3::Cluster AdsIntegrationTest::buildCluster(const std::string& name) {
return ConfigHelper::buildCluster(name, "ROUND_ROBIN", api_version_);
envoy::config::cluster::v3::Cluster AdsIntegrationTest::buildCluster(const std::string& name,
const std::string& lb_policy) {
return ConfigHelper::buildCluster(name, lb_policy, api_version_);
}

envoy::config::cluster::v3::Cluster AdsIntegrationTest::buildTlsCluster(const std::string& name) {
Expand Down
Loading

0 comments on commit cd87e82

Please sign in to comment.