Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDS: remove warming cluster if CDS response desired #13997

Merged
merged 8 commits into from
Nov 13, 2020
Merged
9 changes: 6 additions & 3 deletions include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,15 @@ class ClusterManager {
initializeSecondaryClusters(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) PURE;

using ClusterInfoMap = absl::node_hash_map<std::string, std::reference_wrapper<const Cluster>>;
struct ClusterInfoMaps {
ClusterInfoMap active_clusters_;
ClusterInfoMap warming_clusters_;
};

/**
* @return ClusterInfoMap all current clusters. These are the primary (not thread local)
* clusters which should only be used for stats/admin.
* @return ClusterInfoMap all current clusters including active and warming.
*/
virtual ClusterInfoMap clusters() PURE;
virtual ClusterInfoMaps clusters() PURE;

using ClusterSet = absl::flat_hash_set<std::string>;

Expand Down
6 changes: 3 additions & 3 deletions source/common/grpc/async_client_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@ AsyncClientFactoryImpl::AsyncClientFactoryImpl(Upstream::ClusterManager& cm,
}

const std::string& cluster_name = config.envoy_grpc().cluster_name();
auto clusters = cm_.clusters();
const auto& it = clusters.find(cluster_name);
if (it == clusters.end()) {
auto all_clusters = cm_.clusters();
const auto& it = all_clusters.active_clusters_.find(cluster_name);
if (it == all_clusters.active_clusters_.end()) {
throw EnvoyException(fmt::format("Unknown gRPC client cluster '{}'", cluster_name));
}
if (it->second.get().info()->addedViaApi()) {
Expand Down
17 changes: 12 additions & 5 deletions source/common/upstream/cds_api_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,15 +38,22 @@ CdsApiImpl::CdsApiImpl(const envoy::config::core::v3::ConfigSource& cds_config,

void CdsApiImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
const std::string& version_info) {
ClusterManager::ClusterInfoMap clusters_to_remove = cm_.clusters();
std::vector<envoy::config::cluster::v3::Cluster> clusters;
auto all_existing_clusters = cm_.clusters();
// Exclude the clusters which CDS wants to add.
for (const auto& resource : resources) {
clusters_to_remove.erase(resource.get().name());
all_existing_clusters.active_clusters_.erase(resource.get().name());
all_existing_clusters.warming_clusters_.erase(resource.get().name());
}
Protobuf::RepeatedPtrField<std::string> to_remove_repeated;
for (const auto& [cluster_name, _] : clusters_to_remove) {
for (const auto& [cluster_name, _] : all_existing_clusters.active_clusters_) {
*to_remove_repeated.Add() = cluster_name;
}
for (const auto& [cluster_name, _] : all_existing_clusters.warming_clusters_) {
// Do not add the cluster twice when the cluster is both active and warming.
if (all_existing_clusters.active_clusters_.count(cluster_name) == 0) {
*to_remove_repeated.Add() = cluster_name;
}
}
onConfigUpdate(resources, to_remove_repeated, version_info);
}

Expand All @@ -64,7 +71,7 @@ void CdsApiImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& a
removed_resources.size());

std::vector<std::string> exception_msgs;
absl::node_hash_set<std::string> cluster_names;
absl::flat_hash_set<std::string> cluster_names(added_resources.size());
bool any_applied = false;
for (const auto& resource : added_resources) {
envoy::config::cluster::v3::Cluster cluster;
Expand Down
14 changes: 8 additions & 6 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,17 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
init_helper_.setInitializedCb(callback);
}

ClusterInfoMap clusters() override {
// TODO(mattklein123): Add ability to see warming clusters in admin output.
ClusterInfoMap clusters_map;
ClusterInfoMaps clusters() override {
ClusterInfoMaps clusters_maps;
for (auto& cluster : active_clusters_) {
clusters_map.emplace(cluster.first, *cluster.second->cluster_);
clusters_maps.active_clusters_.emplace(cluster.first, *cluster.second->cluster_);
}

return clusters_map;
for (auto& cluster : warming_clusters_) {
clusters_maps.warming_clusters_.emplace(cluster.first, *cluster.second->cluster_);
}
return clusters_maps;
}

const ClusterSet& primaryClusters() override { return primary_clusters_; }
ThreadLocalCluster* get(absl::string_view cluster) override;

Expand Down
17 changes: 10 additions & 7 deletions source/common/upstream/load_stats_reporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ void LoadStatsReporter::sendLoadStatsRequest() {
// added to the cluster manager. When we get the notification, we record the current time in
// clusters_ as the start time for the load reporting window for that cluster.
request_.mutable_cluster_stats()->Clear();
auto all_clusters = cm_.clusters();
for (const auto& cluster_name_and_timestamp : clusters_) {
const std::string& cluster_name = cluster_name_and_timestamp.first;
auto cluster_info_map = cm_.clusters();
auto it = cluster_info_map.find(cluster_name);
if (it == cluster_info_map.end()) {
auto it = all_clusters.active_clusters_.find(cluster_name);
if (it == all_clusters.active_clusters_.end()) {
ENVOY_LOG(debug, "Cluster {} does not exist", cluster_name);
continue;
}
Expand Down Expand Up @@ -154,7 +154,8 @@ void LoadStatsReporter::startLoadReportPeriod() {
// converge.
absl::node_hash_map<std::string, std::chrono::steady_clock::duration> existing_clusters;
if (message_->send_all_clusters()) {
for (const auto& p : cm_.clusters()) {
auto cluster_info_map = cm_.clusters();
for (const auto& p : cluster_info_map.active_clusters_) {
const std::string& cluster_name = p.first;
if (clusters_.count(cluster_name) > 0) {
existing_clusters.emplace(cluster_name, clusters_[cluster_name]);
Expand All @@ -173,9 +174,10 @@ void LoadStatsReporter::startLoadReportPeriod() {
clusters_.emplace(cluster_name, existing_clusters.count(cluster_name) > 0
? existing_clusters[cluster_name]
: time_source_.monotonicTime().time_since_epoch());
// TODO(lambdai): Move the clusters() call out of this lambda.
auto cluster_info_map = cm_.clusters();
auto it = cluster_info_map.find(cluster_name);
if (it == cluster_info_map.end()) {
auto it = cluster_info_map.active_clusters_.find(cluster_name);
if (it == cluster_info_map.active_clusters_.end()) {
return;
}
// Don't reset stats for existing tracked clusters.
Expand All @@ -193,7 +195,8 @@ void LoadStatsReporter::startLoadReportPeriod() {
cluster.info()->loadReportStats().upstream_rq_dropped_.latch();
};
if (message_->send_all_clusters()) {
for (const auto& p : cm_.clusters()) {
auto cluster_info_map = cm_.clusters();
for (const auto& p : cluster_info_map.active_clusters_) {
const std::string& cluster_name = p.first;
handle_cluster_func(cluster_name);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,9 +133,9 @@ bool ClusterRefreshManagerImpl::onEvent(const std::string& cluster_name, EventTy
if (post_callback) {
main_thread_dispatcher_.post([this, cluster_name, info]() {
// Ensure that cluster is still active before calling callback.
auto map = cm_.clusters();
auto it = map.find(cluster_name);
if (it != map.end()) {
auto maps = cm_.clusters();
auto it = maps.active_clusters_.find(cluster_name);
if (it != maps.active_clusters_.end()) {
info->cb_();
}
});
Expand Down
8 changes: 4 additions & 4 deletions source/extensions/stat_sinks/hystrix/hystrix.cc
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ void HystrixSink::flush(Stats::MetricSnapshot& snapshot) {
}
incCounter();
std::stringstream ss;
Upstream::ClusterManager::ClusterInfoMap clusters = server_.clusterManager().clusters();
Upstream::ClusterManager::ClusterInfoMaps all_clusters = server_.clusterManager().clusters();

// Save a map of the relevant histograms per cluster in a convenient format.
absl::node_hash_map<std::string, QuantileLatencyMap> time_histograms;
Expand Down Expand Up @@ -370,7 +370,7 @@ void HystrixSink::flush(Stats::MetricSnapshot& snapshot) {
}
}

for (auto& cluster : clusters) {
for (auto& cluster : all_clusters.active_clusters_) {
Upstream::ClusterInfoConstSharedPtr cluster_info = cluster.second.get().info();

std::unique_ptr<ClusterStatsCache>& cluster_stats_cache_ptr =
Expand Down Expand Up @@ -407,9 +407,9 @@ void HystrixSink::flush(Stats::MetricSnapshot& snapshot) {
}

// check if any clusters were removed, and remove from cache
if (clusters.size() < cluster_stats_cache_map_.size()) {
if (all_clusters.active_clusters_.size() < cluster_stats_cache_map_.size()) {
for (auto it = cluster_stats_cache_map_.begin(); it != cluster_stats_cache_map_.end();) {
if (clusters.find(it->first) == clusters.end()) {
if (all_clusters.active_clusters_.find(it->first) == all_clusters.active_clusters_.end()) {
auto next_it = std::next(it);
cluster_stats_cache_map_.erase(it);
it = next_it;
Expand Down
8 changes: 6 additions & 2 deletions source/server/admin/clusters_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ void setHealthFlag(Upstream::Host::HealthFlag flag, const Upstream::Host& host,
// TODO(efimki): Add support of text readouts stats.
void ClustersHandler::writeClustersAsJson(Buffer::Instance& response) {
envoy::admin::v3::Clusters clusters;
for (const auto& [name, cluster_ref] : server_.clusterManager().clusters()) {
// TODO(mattklein123): Add ability to see warming clusters in admin output.
auto all_clusters = server_.clusterManager().clusters();
for (const auto& [name, cluster_ref] : all_clusters.active_clusters_) {
const Upstream::Cluster& cluster = cluster_ref.get();
Upstream::ClusterInfoConstSharedPtr cluster_info = cluster.info();

Expand Down Expand Up @@ -184,7 +186,9 @@ void ClustersHandler::writeClustersAsJson(Buffer::Instance& response) {

// TODO(efimki): Add support of text readouts stats.
void ClustersHandler::writeClustersAsText(Buffer::Instance& response) {
for (const auto& [name, cluster_ref] : server_.clusterManager().clusters()) {
// TODO(mattklein123): Add ability to see warming clusters in admin output.
auto all_clusters = server_.clusterManager().clusters();
for (const auto& [name, cluster_ref] : all_clusters.active_clusters_) {
const Upstream::Cluster& cluster = cluster_ref.get();
const std::string& cluster_name = cluster.info()->name();
addOutlierInfo(cluster_name, cluster.outlierDetector(), response);
Expand Down
13 changes: 9 additions & 4 deletions source/server/admin/config_dump_handler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ ConfigDumpHandler::addResourceToDump(envoy::admin::v3::ConfigDump& dump,
const std::string& resource, bool include_eds) const {
Envoy::Server::ConfigTracker::CbsMap callbacks_map = config_tracker_.getCallbacksMap();
if (include_eds) {
if (!server_.clusterManager().clusters().empty()) {
// TODO(mattklein123): Add ability to see warming clusters in admin output.
auto all_clusters = server_.clusterManager().clusters();
if (!all_clusters.active_clusters_.empty()) {
callbacks_map.emplace("endpoint", [this] { return dumpEndpointConfigs(); });
}
}
Expand Down Expand Up @@ -195,7 +197,9 @@ void ConfigDumpHandler::addAllConfigToDump(envoy::admin::v3::ConfigDump& dump,
bool include_eds) const {
Envoy::Server::ConfigTracker::CbsMap callbacks_map = config_tracker_.getCallbacksMap();
if (include_eds) {
if (!server_.clusterManager().clusters().empty()) {
// TODO(mattklein123): Add ability to see warming clusters in admin output.
auto all_clusters = server_.clusterManager().clusters();
if (!all_clusters.active_clusters_.empty()) {
callbacks_map.emplace("endpoint", [this] { return dumpEndpointConfigs(); });
}
}
Expand All @@ -220,8 +224,9 @@ void ConfigDumpHandler::addAllConfigToDump(envoy::admin::v3::ConfigDump& dump,

ProtobufTypes::MessagePtr ConfigDumpHandler::dumpEndpointConfigs() const {
auto endpoint_config_dump = std::make_unique<envoy::admin::v3::EndpointsConfigDump>();

for (const auto& [name, cluster_ref] : server_.clusterManager().clusters()) {
// TODO(mattklein123): Add ability to see warming clusters in admin output.
auto all_clusters = server_.clusterManager().clusters();
for (const auto& [name, cluster_ref] : all_clusters.active_clusters_) {
UNREFERENCED_PARAMETER(name);
const Upstream::Cluster& cluster = cluster_ref.get();
Upstream::ClusterInfoConstSharedPtr cluster_info = cluster.info();
Expand Down
9 changes: 5 additions & 4 deletions test/common/grpc/async_client_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ TEST_F(AsyncClientManagerImplTest, EnvoyGrpcOk) {
envoy::config::core::v3::GrpcService grpc_service;
grpc_service.mutable_envoy_grpc()->set_cluster_name("foo");

Upstream::ClusterManager::ClusterInfoMap cluster_map;
Upstream::ClusterManager::ClusterInfoMaps cluster_maps;
Upstream::MockClusterMockPrioritySet cluster;
cluster_map.emplace("foo", cluster);
EXPECT_CALL(cm_, clusters()).WillOnce(Return(cluster_map));
cluster_maps.active_clusters_.emplace("foo", cluster);
EXPECT_CALL(cm_, clusters()).WillOnce(Return(cluster_maps));
EXPECT_CALL(cluster, info());
EXPECT_CALL(*cluster.info_, addedViaApi());

Expand All @@ -65,7 +65,8 @@ TEST_F(AsyncClientManagerImplTest, EnvoyGrpcDynamicCluster) {
Upstream::ClusterManager::ClusterInfoMap cluster_map;
Upstream::MockClusterMockPrioritySet cluster;
cluster_map.emplace("foo", cluster);
EXPECT_CALL(cm_, clusters()).WillOnce(Return(cluster_map));
EXPECT_CALL(cm_, clusters())
.WillOnce(Return(Upstream::ClusterManager::ClusterInfoMaps{cluster_map, {}}));
EXPECT_CALL(cluster, info());
EXPECT_CALL(*cluster.info_, addedViaApi()).WillOnce(Return(true));
EXPECT_THROW_WITH_MESSAGE(
Expand Down
34 changes: 19 additions & 15 deletions test/common/upstream/cds_api_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -54,16 +54,20 @@ class CdsApiImplTest : public testing::Test {
.WillOnce(Throw(EnvoyException(exception_msg)));
}

ClusterManager::ClusterInfoMap makeClusterMap(const std::vector<std::string>& clusters) {
ClusterManager::ClusterInfoMap map;
for (const auto& cluster : clusters) {
map.emplace(cluster, cm_.thread_local_cluster_.cluster_);
ClusterManager::ClusterInfoMaps
makeClusterInfoMaps(const std::vector<std::string>& active_clusters,
const std::vector<std::string>& warming_clusters = {}) {
ClusterManager::ClusterInfoMaps maps;
for (const auto& cluster : active_clusters) {
maps.active_clusters_.emplace(cluster, cm_.thread_local_cluster_.cluster_);
}
return map;
for (const auto& cluster : warming_clusters) {
maps.warming_clusters_.emplace(cluster, cm_.thread_local_cluster_.cluster_);
}
return maps;
}

NiceMock<MockClusterManager> cm_;
Upstream::ClusterManager::ClusterInfoMap cluster_map_;
Upstream::MockClusterMockPrioritySet mock_cluster_;
Stats::IsolatedStoreImpl store_;
CdsApiPtr cds_;
Expand Down Expand Up @@ -92,7 +96,7 @@ version_info: '0'
auto response1 =
TestUtility::parseYaml<envoy::service::discovery::v3::DiscoveryResponse>(response1_yaml);

EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{}));
EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({})));
expectAdd("cluster1", "0");
EXPECT_CALL(initialized_, ready());
EXPECT_EQ("", cds_->versionInfo());
Expand All @@ -108,7 +112,7 @@ version_info: '1'
)EOF";
auto response2 =
TestUtility::parseYaml<envoy::service::discovery::v3::DiscoveryResponse>(response2_yaml);
EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterMap({"cluster1"})));
EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({"cluster1"})));
EXPECT_CALL(cm_, removeCluster("cluster1")).WillOnce(Return(true));
const auto decoded_resources_2 =
TestUtility::decodeResources<envoy::config::cluster::v3::Cluster>(response2);
Expand All @@ -126,7 +130,7 @@ TEST_F(CdsApiImplTest, ValidateDuplicateClusters) {
cluster_1.set_name("duplicate_cluster");
const auto decoded_resources = TestUtility::decodeResources({cluster_1, cluster_1});

EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(cluster_map_));
EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(makeClusterInfoMaps({})));
EXPECT_CALL(initialized_, ready());
EXPECT_THROW_WITH_MESSAGE(cds_callbacks_->onConfigUpdate(decoded_resources.refvec_, ""),
EnvoyException,
Expand All @@ -139,7 +143,7 @@ TEST_F(CdsApiImplTest, EmptyConfigUpdate) {

setup();

EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{}));
EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({})));
EXPECT_CALL(initialized_, ready());

cds_callbacks_->onConfigUpdate({}, "");
Expand All @@ -151,7 +155,7 @@ TEST_F(CdsApiImplTest, ConfigUpdateWith2ValidClusters) {
setup();
}

EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{}));
EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({})));
EXPECT_CALL(initialized_, ready());

envoy::config::cluster::v3::Cluster cluster_1;
Expand Down Expand Up @@ -224,7 +228,7 @@ TEST_F(CdsApiImplTest, ConfigUpdateAddsSecondClusterEvenIfFirstThrows) {
setup();
}

EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{}));
EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({})));
EXPECT_CALL(initialized_, ready());

envoy::config::cluster::v3::Cluster cluster_1;
Expand Down Expand Up @@ -269,7 +273,7 @@ version_info: '0'
auto response1 =
TestUtility::parseYaml<envoy::service::discovery::v3::DiscoveryResponse>(response1_yaml);

EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{}));
EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({})));
expectAdd("cluster1", "0");
expectAdd("cluster2", "0");
EXPECT_CALL(initialized_, ready());
Expand Down Expand Up @@ -298,7 +302,7 @@ version_info: '1'
auto response2 =
TestUtility::parseYaml<envoy::service::discovery::v3::DiscoveryResponse>(response2_yaml);

EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterMap({"cluster1", "cluster2"})));
EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterInfoMaps({"cluster1", "cluster2"})));
expectAdd("cluster1", "1");
expectAdd("cluster3", "1");
EXPECT_CALL(cm_, removeCluster("cluster2"));
Expand Down Expand Up @@ -334,7 +338,7 @@ version_info: '0'
auto response1 =
TestUtility::parseYaml<envoy::service::discovery::v3::DiscoveryResponse>(response1_yaml);

EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(cluster_map_));
EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(makeClusterInfoMaps({})));
EXPECT_CALL(initialized_, ready());
const auto decoded_resources =
TestUtility::decodeResources<envoy::config::cluster::v3::Cluster>(response1);
Expand Down
6 changes: 3 additions & 3 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ TEST_F(ClusterManagerImplTest, ValidClusterName) {

create(parseBootstrapFromV3Yaml(yaml));
cluster_manager_->clusters()
.find("cluster:name")
.active_clusters_.find("cluster:name")
->second.get()
.info()
->statsScope()
Expand Down Expand Up @@ -1490,7 +1490,7 @@ TEST_F(ClusterManagerImplTest, DynamicAddRemove) {
EXPECT_TRUE(cluster_manager_->addOrUpdateCluster(update_cluster, ""));

EXPECT_EQ(cluster2->info_, cluster_manager_->get("fake_cluster")->info());
EXPECT_EQ(1UL, cluster_manager_->clusters().size());
EXPECT_EQ(1UL, cluster_manager_->clusters().active_clusters_.size());
Http::ConnectionPool::MockInstance* cp = new Http::ConnectionPool::MockInstance();
EXPECT_CALL(factory_, allocateConnPool_(_, _, _)).WillOnce(Return(cp));
EXPECT_EQ(cp, cluster_manager_->httpConnPoolForCluster("fake_cluster", ResourcePriority::Default,
Expand Down Expand Up @@ -1520,7 +1520,7 @@ TEST_F(ClusterManagerImplTest, DynamicAddRemove) {
EXPECT_CALL(*cp2, addDrainedCallback(_)).WillOnce(SaveArg<0>(&drained_cb2));
EXPECT_TRUE(cluster_manager_->removeCluster("fake_cluster"));
EXPECT_EQ(nullptr, cluster_manager_->get("fake_cluster"));
EXPECT_EQ(0UL, cluster_manager_->clusters().size());
EXPECT_EQ(0UL, cluster_manager_->clusters().active_clusters_.size());

// Close the TCP connection. Success is no ASSERT or crash due to referencing
// the removed cluster.
Expand Down
Loading