Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CDS: remove warming cluster if CDS response desired #13997

Merged
merged 8 commits into from
Nov 13, 2020
Merged
6 changes: 6 additions & 0 deletions include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,12 @@ class ClusterManager {
*/
virtual ClusterInfoMap clusters() PURE;

/**
* @return absl::flat_hash_set<std::string> all current clusters names including warming and
* active.
*/
virtual absl::flat_hash_set<std::string> allClusterNames() PURE;

using ClusterSet = absl::flat_hash_set<std::string>;

/**
Expand Down
6 changes: 3 additions & 3 deletions source/common/upstream/cds_api_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,13 @@ CdsApiImpl::CdsApiImpl(const envoy::config::core::v3::ConfigSource& cds_config,

void CdsApiImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& resources,
const std::string& version_info) {
ClusterManager::ClusterInfoMap clusters_to_remove = cm_.clusters();
auto clusters_to_remove = cm_.allClusterNames();
std::vector<envoy::config::cluster::v3::Cluster> clusters;
for (const auto& resource : resources) {
clusters_to_remove.erase(resource.get().name());
}
Protobuf::RepeatedPtrField<std::string> to_remove_repeated;
for (const auto& [cluster_name, _] : clusters_to_remove) {
for (const auto& cluster_name : clusters_to_remove) {
*to_remove_repeated.Add() = cluster_name;
}
onConfigUpdate(resources, to_remove_repeated, version_info);
Expand All @@ -64,7 +64,7 @@ void CdsApiImpl::onConfigUpdate(const std::vector<Config::DecodedResourceRef>& a
removed_resources.size());

std::vector<std::string> exception_msgs;
absl::node_hash_set<std::string> cluster_names;
absl::flat_hash_set<std::string> cluster_names(added_resources.size());
bool any_applied = false;
for (const auto& resource : added_resources) {
envoy::config::cluster::v3::Cluster cluster;
Expand Down
14 changes: 14 additions & 0 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -223,6 +223,20 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u

return clusters_map;
}

absl::flat_hash_set<std::string> allClusterNames() override {
absl::flat_hash_set<std::string> clusters_names(active_clusters_.size() +
warming_clusters_.size());

for (const auto& [name, _] : active_clusters_) {
clusters_names.emplace(name);
}
for (const auto& [name, _] : warming_clusters_) {
clusters_names.emplace(name);
}
return clusters_names;
}

const ClusterSet& primaryClusters() override { return primary_clusters_; }
ThreadLocalCluster* get(absl::string_view cluster) override;

Expand Down
25 changes: 15 additions & 10 deletions test/common/upstream/cds_api_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,11 @@ class CdsApiImplTest : public testing::Test {
return map;
}

absl::flat_hash_set<std::string> makeAllClusterNames(const std::vector<std::string>& clusters) {
return absl::flat_hash_set<std::string>(clusters.begin(), clusters.end());
}

NiceMock<MockClusterManager> cm_;
Upstream::ClusterManager::ClusterInfoMap cluster_map_;
Upstream::MockClusterMockPrioritySet mock_cluster_;
Stats::IsolatedStoreImpl store_;
CdsApiPtr cds_;
Expand Down Expand Up @@ -92,7 +95,7 @@ version_info: '0'
auto response1 =
TestUtility::parseYaml<envoy::service::discovery::v3::DiscoveryResponse>(response1_yaml);

EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{}));
EXPECT_CALL(cm_, allClusterNames()).WillOnce(Return(absl::flat_hash_set<std::string>{}));
expectAdd("cluster1", "0");
EXPECT_CALL(initialized_, ready());
EXPECT_EQ("", cds_->versionInfo());
Expand All @@ -108,7 +111,8 @@ version_info: '1'
)EOF";
auto response2 =
TestUtility::parseYaml<envoy::service::discovery::v3::DiscoveryResponse>(response2_yaml);
EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterMap({"cluster1"})));
EXPECT_CALL(cm_, allClusterNames())
.WillOnce(Return(absl::flat_hash_set<std::string>{"cluster1"}));
EXPECT_CALL(cm_, removeCluster("cluster1")).WillOnce(Return(true));
const auto decoded_resources_2 =
TestUtility::decodeResources<envoy::config::cluster::v3::Cluster>(response2);
Expand All @@ -126,7 +130,7 @@ TEST_F(CdsApiImplTest, ValidateDuplicateClusters) {
cluster_1.set_name("duplicate_cluster");
const auto decoded_resources = TestUtility::decodeResources({cluster_1, cluster_1});

EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(cluster_map_));
EXPECT_CALL(cm_, allClusterNames()).WillRepeatedly(Return(absl::flat_hash_set<std::string>{}));
EXPECT_CALL(initialized_, ready());
EXPECT_THROW_WITH_MESSAGE(cds_callbacks_->onConfigUpdate(decoded_resources.refvec_, ""),
EnvoyException,
Expand All @@ -139,7 +143,7 @@ TEST_F(CdsApiImplTest, EmptyConfigUpdate) {

setup();

EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{}));
EXPECT_CALL(cm_, allClusterNames()).WillOnce(Return(absl::flat_hash_set<std::string>{}));
EXPECT_CALL(initialized_, ready());

cds_callbacks_->onConfigUpdate({}, "");
Expand All @@ -151,7 +155,7 @@ TEST_F(CdsApiImplTest, ConfigUpdateWith2ValidClusters) {
setup();
}

EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{}));
EXPECT_CALL(cm_, allClusterNames()).WillOnce(Return(absl::flat_hash_set<std::string>{}));
EXPECT_CALL(initialized_, ready());

envoy::config::cluster::v3::Cluster cluster_1;
Expand Down Expand Up @@ -224,7 +228,7 @@ TEST_F(CdsApiImplTest, ConfigUpdateAddsSecondClusterEvenIfFirstThrows) {
setup();
}

EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{}));
EXPECT_CALL(cm_, allClusterNames()).WillOnce(Return(absl::flat_hash_set<std::string>{}));
EXPECT_CALL(initialized_, ready());

envoy::config::cluster::v3::Cluster cluster_1;
Expand Down Expand Up @@ -269,7 +273,7 @@ version_info: '0'
auto response1 =
TestUtility::parseYaml<envoy::service::discovery::v3::DiscoveryResponse>(response1_yaml);

EXPECT_CALL(cm_, clusters()).WillOnce(Return(ClusterManager::ClusterInfoMap{}));
EXPECT_CALL(cm_, allClusterNames()).WillOnce(Return(absl::flat_hash_set<std::string>{}));
expectAdd("cluster1", "0");
expectAdd("cluster2", "0");
EXPECT_CALL(initialized_, ready());
Expand Down Expand Up @@ -298,7 +302,8 @@ version_info: '1'
auto response2 =
TestUtility::parseYaml<envoy::service::discovery::v3::DiscoveryResponse>(response2_yaml);

EXPECT_CALL(cm_, clusters()).WillOnce(Return(makeClusterMap({"cluster1", "cluster2"})));
EXPECT_CALL(cm_, allClusterNames())
.WillOnce(Return(absl::flat_hash_set<std::string>{"cluster1", "cluster2"}));
expectAdd("cluster1", "1");
expectAdd("cluster3", "1");
EXPECT_CALL(cm_, removeCluster("cluster2"));
Expand Down Expand Up @@ -334,7 +339,7 @@ version_info: '0'
auto response1 =
TestUtility::parseYaml<envoy::service::discovery::v3::DiscoveryResponse>(response1_yaml);

EXPECT_CALL(cm_, clusters()).WillRepeatedly(Return(cluster_map_));
EXPECT_CALL(cm_, allClusterNames()).WillRepeatedly(Return(absl::flat_hash_set<std::string>{}));
EXPECT_CALL(initialized_, ready());
const auto decoded_resources =
TestUtility::decodeResources<envoy::config::cluster::v3::Cluster>(response1);
Expand Down
95 changes: 89 additions & 6 deletions test/integration/ads_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -553,8 +553,9 @@ TEST_P(AdsIntegrationTest, CdsPausedDuringWarming) {

// Send the second warming cluster.
sendDiscoveryResponse<envoy::config::cluster::v3::Cluster>(
Config::TypeUrl::get().Cluster, {buildCluster("warming_cluster_2")},
{buildCluster("warming_cluster_2")}, {}, "3");
Config::TypeUrl::get().Cluster,
{buildCluster("warming_cluster_1"), buildCluster("warming_cluster_2")},
{buildCluster("warming_cluster_1"), buildCluster("warming_cluster_2")}, {}, "3");
test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 2);
// We would've got a Cluster discovery request with version 2 here, had the CDS not been paused.

Expand Down Expand Up @@ -586,6 +587,87 @@ TEST_P(AdsIntegrationTest, CdsPausedDuringWarming) {
{"warming_cluster_2", "warming_cluster_1"}, {}, {}));
}

TEST_P(AdsIntegrationTest, RemoveWarmingCluster) {
initialize();

// Send initial configuration, validate we can process a request.
EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "", {}, {}, {}, true));
sendDiscoveryResponse<envoy::config::cluster::v3::Cluster>(Config::TypeUrl::get().Cluster,
{buildCluster("cluster_0")},
{buildCluster("cluster_0")}, {}, "1");
EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "",
{"cluster_0"}, {"cluster_0"}, {}));

sendDiscoveryResponse<envoy::config::endpoint::v3::ClusterLoadAssignment>(
Config::TypeUrl::get().ClusterLoadAssignment, {buildClusterLoadAssignment("cluster_0")},
{buildClusterLoadAssignment("cluster_0")}, {}, "1");

EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "1", {}, {}, {}));
EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Listener, "", {}, {}, {}));
sendDiscoveryResponse<envoy::config::listener::v3::Listener>(
Config::TypeUrl::get().Listener, {buildListener("listener_0", "route_config_0")},
{buildListener("listener_0", "route_config_0")}, {}, "1");

EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "1",
{"cluster_0"}, {}, {}));
EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "",
{"route_config_0"}, {"route_config_0"}, {}));
sendDiscoveryResponse<envoy::config::route::v3::RouteConfiguration>(
Config::TypeUrl::get().RouteConfiguration, {buildRouteConfig("route_config_0", "cluster_0")},
{buildRouteConfig("route_config_0", "cluster_0")}, {}, "1");

EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Listener, "1", {}, {}, {}));
EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().RouteConfiguration, "1",
{"route_config_0"}, {}, {}));

test_server_->waitForCounterGe("listener_manager.listener_create_success", 1);
makeSingleRequest();

// Send the first warming cluster.
sendDiscoveryResponse<envoy::config::cluster::v3::Cluster>(
Config::TypeUrl::get().Cluster, {buildCluster("warming_cluster_1")},
{buildCluster("warming_cluster_1")}, {"cluster_0"}, "2");

test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1);

EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "1",
{"warming_cluster_1"}, {"warming_cluster_1"}, {"cluster_0"}));

// Send the second warming cluster and remove the first cluster.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the core of this test case.

sendDiscoveryResponse<envoy::config::cluster::v3::Cluster>(Config::TypeUrl::get().Cluster,
{buildCluster("warming_cluster_2")},
{buildCluster("warming_cluster_2")},
// Delta: remove warming_cluster_1.
{"warming_cluster_1"}, "3");
test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1);
// We would've got a Cluster discovery request with version 2 here, had the CDS not been paused.

EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "1",
{"warming_cluster_2"}, {"warming_cluster_2"},
{"warming_cluster_1"}));

// Finish warming the clusters. Note that the first warming cluster is not included in the
// response.
sendDiscoveryResponse<envoy::config::endpoint::v3::ClusterLoadAssignment>(
Config::TypeUrl::get().ClusterLoadAssignment,
{buildClusterLoadAssignment("warming_cluster_2")},
{buildClusterLoadAssignment("warming_cluster_2")}, {"cluster_0"}, "2");

// Validate that all clusters are warmed.
test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0);
test_server_->waitForGaugeEq("cluster_manager.active_clusters", 3);

// CDS is resumed and EDS response was acknowledged.
if (sotw_or_delta_ == Grpc::SotwOrDelta::Delta) {
// Envoy will ACK both Cluster messages. Since they arrived while CDS was paused, they aren't
// sent until CDS is unpaused. Since version 3 has already arrived by the time the version 2
// ACK goes out, they're both acknowledging version 3.
EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "3", {}, {}, {}));
}
EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().Cluster, "3", {}, {}, {}));
EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "2",
{"warming_cluster_2"}, {}, {}));
}
// Validate that warming listeners are removed when left out of SOTW update.
TEST_P(AdsIntegrationTest, RemoveWarmingListener) {
initialize();
Expand Down Expand Up @@ -696,8 +778,9 @@ TEST_P(AdsIntegrationTest, ClusterWarmingOnNamedResponse) {

// Send the second warming cluster.
sendDiscoveryResponse<envoy::config::cluster::v3::Cluster>(
Config::TypeUrl::get().Cluster, {buildCluster("warming_cluster_2")},
{buildCluster("warming_cluster_2")}, {}, "3");
Config::TypeUrl::get().Cluster,
{buildCluster("warming_cluster_1"), buildCluster("warming_cluster_2")},
{buildCluster("warming_cluster_1"), buildCluster("warming_cluster_2")}, {}, "3");
test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 2);

EXPECT_TRUE(compareDiscoveryRequest(Config::TypeUrl::get().ClusterLoadAssignment, "1",
Expand Down Expand Up @@ -1359,8 +1442,8 @@ TEST_P(AdsClusterV2Test, CdsPausedDuringWarming) {

// Send the second warming cluster.
sendDiscoveryResponse<envoy::config::cluster::v3::Cluster>(
cds_type_url, {buildCluster("warming_cluster_2")}, {buildCluster("warming_cluster_2")}, {},
"3", true);
cds_type_url, {buildCluster("warming_cluster_1"), buildCluster("warming_cluster_2")},
{buildCluster("warming_cluster_1"), buildCluster("warming_cluster_2")}, {}, "3", true);
test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 2);
// We would've got a Cluster discovery request with version 2 here, had the CDS not been paused.

Expand Down
2 changes: 2 additions & 0 deletions test/mocks/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class MockClusterManager : public ClusterManager {
MOCK_METHOD(void, initializeSecondaryClusters,
(const envoy::config::bootstrap::v3::Bootstrap& bootstrap));
MOCK_METHOD(ClusterInfoMap, clusters, ());
MOCK_METHOD(absl::flat_hash_set<std::string>, allClusterNames, ());

MOCK_METHOD(const ClusterSet&, primaryClusters, ());
MOCK_METHOD(ThreadLocalCluster*, get, (absl::string_view cluster));
MOCK_METHOD(Http::ConnectionPool::Instance*, httpConnPoolForCluster,
Expand Down