From c867cb13cb2549606bb9410417b9e1556b38088d Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Tue, 3 Nov 2020 05:42:18 +0000 Subject: [PATCH 01/18] dup counter Signed-off-by: Yuchen Dai --- .../common/upstream/cluster_manager_impl.cc | 5 +++++ test/integration/ads_integration.cc | 4 ++-- test/integration/ads_integration.h | 2 +- test/integration/ads_integration_test.cc | 22 +++++++++++++++++++ 4 files changed, 30 insertions(+), 3 deletions(-) diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 924c0a6feb07..347a3984243c 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -67,6 +67,11 @@ void ClusterManagerInitHelper::addCluster(Cluster& cluster) { cluster.initialize(initialize_cb); } else { ASSERT(cluster.initializePhase() == Cluster::InitializePhase::Secondary); + secondary_init_clusters_.remove_if( + [name_to_remove = cluster.info()->name()](Cluster* cluster_iter) { + return cluster_iter->info()->name() == name_to_remove; + }); + secondary_init_clusters_.push_back(&cluster); if (started_secondary_initialize_) { // This can happen if we get a second CDS update that adds new clusters after we have diff --git a/test/integration/ads_integration.cc b/test/integration/ads_integration.cc index 7d81b1de0a1b..ffbd4b90225f 100644 --- a/test/integration/ads_integration.cc +++ b/test/integration/ads_integration.cc @@ -34,8 +34,8 @@ AdsIntegrationTest::AdsIntegrationTest(const envoy::config::core::v3::ApiVersion void AdsIntegrationTest::TearDown() { cleanUpXdsConnection(); } -envoy::config::cluster::v3::Cluster AdsIntegrationTest::buildCluster(const std::string& name) { - return ConfigHelper::buildCluster(name, "ROUND_ROBIN", api_version_); +envoy::config::cluster::v3::Cluster AdsIntegrationTest::buildCluster(const std::string& name, const std::string& lb_policy) { + return ConfigHelper::buildCluster(name, lb_policy, api_version_); } envoy::config::cluster::v3::Cluster AdsIntegrationTest::buildTlsCluster(const std::string& name) { diff --git a/test/integration/ads_integration.h b/test/integration/ads_integration.h index 0da99aea566a..d84afd8d5dbf 100644 --- a/test/integration/ads_integration.h +++ b/test/integration/ads_integration.h @@ -22,7 +22,7 @@ class AdsIntegrationTest : public Grpc::DeltaSotwIntegrationParamTest, public Ht void TearDown() override; - envoy::config::cluster::v3::Cluster buildCluster(const std::string& name); + envoy::config::cluster::v3::Cluster buildCluster(const std::string& name, const std::string& lb_policy = "ROUND_ROBIN"); envoy::config::cluster::v3::Cluster buildTlsCluster(const std::string& name); diff --git a/test/integration/ads_integration_test.cc b/test/integration/ads_integration_test.cc index 01aae9dc9f73..3b5b7303be99 100644 --- a/test/integration/ads_integration_test.cc +++ b/test/integration/ads_integration_test.cc @@ -1152,6 +1152,28 @@ TEST_P(AdsClusterV3Test, BasicClusterInitialWarming) { test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); } +TEST_P(AdsClusterV3Test, BasicClusterResendWarming) { + initialize(); + const auto cds_type_url = Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3); + const auto eds_type_url = Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3); + + EXPECT_TRUE(compareDiscoveryRequest(cds_type_url, "", {}, {}, {}, true)); + sendDiscoveryResponse( + cds_type_url, {buildCluster("cluster_0")}, {buildCluster("cluster_0")}, {}, "1", false); + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1); + sendDiscoveryResponse( + cds_type_url, {buildCluster("cluster_0", "MAGLEV")}, {buildCluster("cluster_0", "MAGLEV")}, {}, "2", false); + EXPECT_TRUE(compareDiscoveryRequest(eds_type_url, "", {"cluster_0"}, {"cluster_0"}, {})); + sendDiscoveryResponse( + eds_type_url, {buildClusterLoadAssignment("cluster_0")}, + {buildClusterLoadAssignment("cluster_0")}, {}, "1", false); + + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); +} + // Verify CDS is paused during cluster warming. TEST_P(AdsClusterV3Test, CdsPausedDuringWarming) { initialize(); From 9d3849a96bb8acc31dfd9b5fa7bfd2d7e3c4ab63 Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Tue, 3 Nov 2020 05:46:19 +0000 Subject: [PATCH 02/18] decrease warming cluster counter Signed-off-by: Yuchen Dai --- test/integration/ads_integration.cc | 3 ++- test/integration/ads_integration.h | 3 ++- test/integration/ads_integration_test.cc | 6 ++++-- 3 files changed, 8 insertions(+), 4 deletions(-) diff --git a/test/integration/ads_integration.cc b/test/integration/ads_integration.cc index ffbd4b90225f..6a0db9004244 100644 --- a/test/integration/ads_integration.cc +++ b/test/integration/ads_integration.cc @@ -34,7 +34,8 @@ AdsIntegrationTest::AdsIntegrationTest(const envoy::config::core::v3::ApiVersion void AdsIntegrationTest::TearDown() { cleanUpXdsConnection(); } -envoy::config::cluster::v3::Cluster AdsIntegrationTest::buildCluster(const std::string& name, const std::string& lb_policy) { +envoy::config::cluster::v3::Cluster AdsIntegrationTest::buildCluster(const std::string& name, + const std::string& lb_policy) { return ConfigHelper::buildCluster(name, lb_policy, api_version_); } diff --git a/test/integration/ads_integration.h b/test/integration/ads_integration.h index d84afd8d5dbf..8c9a8e0ab3e6 100644 --- a/test/integration/ads_integration.h +++ b/test/integration/ads_integration.h @@ -22,7 +22,8 @@ class AdsIntegrationTest : public Grpc::DeltaSotwIntegrationParamTest, public Ht void TearDown() override; - envoy::config::cluster::v3::Cluster buildCluster(const std::string& name, const std::string& lb_policy = "ROUND_ROBIN"); + envoy::config::cluster::v3::Cluster buildCluster(const std::string& name, + const std::string& lb_policy = "ROUND_ROBIN"); envoy::config::cluster::v3::Cluster buildTlsCluster(const std::string& name); diff --git a/test/integration/ads_integration_test.cc b/test/integration/ads_integration_test.cc index 3b5b7303be99..950f85630df3 100644 --- a/test/integration/ads_integration_test.cc +++ b/test/integration/ads_integration_test.cc @@ -1152,7 +1152,7 @@ TEST_P(AdsClusterV3Test, BasicClusterInitialWarming) { test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); } -TEST_P(AdsClusterV3Test, BasicClusterResendWarming) { +TEST_P(AdsClusterV3Test, ClusterUpdateWhenWarming) { initialize(); const auto cds_type_url = Config::getTypeUrl( envoy::config::core::v3::ApiVersion::V3); @@ -1163,8 +1163,10 @@ TEST_P(AdsClusterV3Test, BasicClusterResendWarming) { sendDiscoveryResponse( cds_type_url, {buildCluster("cluster_0")}, {buildCluster("cluster_0")}, {}, "1", false); test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1); + // Update lb policy to MAGLEV so that cluster update is not skipped due to the same hash. sendDiscoveryResponse( - cds_type_url, {buildCluster("cluster_0", "MAGLEV")}, {buildCluster("cluster_0", "MAGLEV")}, {}, "2", false); + cds_type_url, {buildCluster("cluster_0", "MAGLEV")}, {buildCluster("cluster_0", "MAGLEV")}, + {}, "2", false); EXPECT_TRUE(compareDiscoveryRequest(eds_type_url, "", {"cluster_0"}, {"cluster_0"}, {})); sendDiscoveryResponse( eds_type_url, {buildClusterLoadAssignment("cluster_0")}, From 77c85c11a0e03e741f80a31a82de6c6357d57674 Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Tue, 3 Nov 2020 08:10:57 +0000 Subject: [PATCH 03/18] fix test Signed-off-by: Yuchen Dai --- test/common/upstream/cluster_manager_impl_test.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 34d8403cf5d0..017eb5d1eddd 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -3237,6 +3237,7 @@ TEST_F(ClusterManagerInitHelperTest, AddSecondaryAfterSecondaryInit) { init_helper_.addCluster(cluster1); NiceMock cluster2; + cluster2.info_->name_ = "cluster2"; ON_CALL(cluster2, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary)); init_helper_.addCluster(cluster2); @@ -3249,6 +3250,8 @@ TEST_F(ClusterManagerInitHelperTest, AddSecondaryAfterSecondaryInit) { init_helper_.startInitializingSecondaryClusters(); NiceMock cluster3; + cluster3.info_->name_ = "cluster3"; + ON_CALL(cluster3, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary)); EXPECT_CALL(cluster3, initialize(_)); init_helper_.addCluster(cluster3); From 4dbde089c568f9795d5c45be795b3d0ec1d9da70 Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Tue, 3 Nov 2020 21:53:10 +0000 Subject: [PATCH 04/18] fix cluster warming and last warming Signed-off-by: Yuchen Dai --- .../common/upstream/cluster_manager_impl.cc | 33 ++++++++++---- source/common/upstream/cluster_manager_impl.h | 10 ++++- source/common/upstream/eds.h | 6 ++- test/common/grpc/grpc_client_integration.h | 2 +- test/integration/ads_integration_test.cc | 35 ++++++++++++++- test/integration/base_integration_test.cc | 44 +++++++++---------- 6 files changed, 94 insertions(+), 36 deletions(-) diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 347a3984243c..f9cee4aa272e 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -613,6 +613,8 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cl if (existing_active_cluster != active_clusters_.end() || existing_warming_cluster != warming_clusters_.end()) { if (existing_active_cluster != active_clusters_.end()) { + ENVOY_LOG_MISC(debug, "lambdai: calling remove cluster {} ", + existing_active_cluster->second->cluster_->info()->name()); // The following init manager remove call is a NOP in the case we are already initialized. // It's just kept here to avoid additional logic. init_helper_.removeCluster(*existing_active_cluster->second->cluster_); @@ -634,7 +636,9 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cl // and easy to understand. const bool all_clusters_initialized = init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized; - loadCluster(cluster, version_info, true, warming_clusters_); + // Preserve the previous cluster data to avoid early destroy. The same cluster should be added + // before destroy to avoid early initialization complete. + auto previous_cluster_guard = loadCluster(cluster, version_info, true, warming_clusters_); auto& cluster_entry = warming_clusters_.at(cluster_name); if (!all_clusters_initialized) { ENVOY_LOG(debug, "add/update cluster {} during init", cluster_name); @@ -734,9 +738,10 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) { return removed; } -void ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& cluster, - const std::string& version_info, bool added_via_api, - ClusterMap& cluster_map) { +absl::optional +ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& cluster, + const std::string& version_info, bool added_via_api, + ClusterMap& cluster_map) { std::pair new_cluster_pair = factory_.clusterFromProto(cluster, *this, outlier_event_logger_, added_via_api); auto& new_cluster = new_cluster_pair.first; @@ -781,11 +786,20 @@ void ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& } }); } - - cluster_map[cluster_reference.info()->name()] = std::make_unique( - cluster, version_info, added_via_api, std::move(new_cluster), time_source_); - const auto cluster_entry_it = cluster_map.find(cluster_reference.info()->name()); - + absl::optional result; + auto cluster_entry_it = cluster_map.find(cluster_reference.info()->name()); + if (cluster_entry_it != cluster_map.end()) { + result = absl::make_optional(std::move(cluster_entry_it->second)); + cluster_entry_it->second = std::make_unique(cluster, version_info, added_via_api, + std::move(new_cluster), time_source_); + } else { + bool inserted = false; + std::tie(cluster_entry_it, inserted) = + cluster_map.emplace(cluster_reference.info()->name(), + std::make_unique(cluster, version_info, added_via_api, + std::move(new_cluster), time_source_)); + ASSERT(inserted); + } // If an LB is thread aware, create it here. The LB is not initialized until cluster pre-init // finishes. For RingHash/Maglev don't create the LB here if subset balancing is enabled, // because the thread_aware_lb_ field takes precedence over the subset lb). @@ -808,6 +822,7 @@ void ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& } updateClusterCounts(); + return result; } void ClusterManagerImpl::updateClusterCounts() { diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 1aa14c4be78c..469fd9e02ab3 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -477,8 +477,14 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable contains the previous cluster in the cluster_map, or + * nullopt if cluster_map does not contain the same cluster. + */ + absl::optional loadCluster(const envoy::config::cluster::v3::Cluster& cluster, + const std::string& version_info, bool added_via_api, + ClusterMap& cluster_map); void onClusterInit(Cluster& cluster); void postThreadLocalHealthFailure(const HostSharedPtr& host); void updateClusterCounts(); diff --git a/source/common/upstream/eds.h b/source/common/upstream/eds.h index 4ab24c38788a..b5da7d732422 100644 --- a/source/common/upstream/eds.h +++ b/source/common/upstream/eds.h @@ -34,7 +34,11 @@ class EdsClusterImpl EdsClusterImpl(const envoy::config::cluster::v3::Cluster& cluster, Runtime::Loader& runtime, Server::Configuration::TransportSocketFactoryContextImpl& factory_context, Stats::ScopePtr&& stats_scope, bool added_via_api); - + ~EdsClusterImpl() override { + ENVOY_LOG_MISC(debug, "lambdai: destroy"); + probePoint(); + } + void probePoint() { ENVOY_LOG_MISC(debug, "lambdai: probe"); } // Upstream::Cluster InitializePhase initializePhase() const override { return initialize_phase_; } diff --git a/test/common/grpc/grpc_client_integration.h b/test/common/grpc/grpc_client_integration.h index a7bd2ee4b5d7..bbcc63403784 100644 --- a/test/common/grpc/grpc_client_integration.h +++ b/test/common/grpc/grpc_client_integration.h @@ -122,7 +122,7 @@ class DeltaSotwIntegrationParamTest #define DELTA_SOTW_GRPC_CLIENT_INTEGRATION_PARAMS \ testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), \ testing::Values(Grpc::ClientType::EnvoyGrpc, Grpc::ClientType::GoogleGrpc), \ - testing::Values(Grpc::SotwOrDelta::Sotw, Grpc::SotwOrDelta::Delta)) + testing::Values(Grpc::SotwOrDelta::Sotw)) #else #define GRPC_CLIENT_INTEGRATION_PARAMS \ testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), \ diff --git a/test/integration/ads_integration_test.cc b/test/integration/ads_integration_test.cc index 950f85630df3..35f4b5d37720 100644 --- a/test/integration/ads_integration_test.cc +++ b/test/integration/ads_integration_test.cc @@ -1152,7 +1152,9 @@ TEST_P(AdsClusterV3Test, BasicClusterInitialWarming) { test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); } -TEST_P(AdsClusterV3Test, ClusterUpdateWhenWarming) { +// Update the only warming cluster. Verify that the new cluster is still warming and the cluster +// manager as a whole is not initialized. +TEST_P(AdsClusterV3Test, ClusterInitializationUpdateTheOnlyWarmingCluster) { initialize(); const auto cds_type_url = Config::getTypeUrl( envoy::config::core::v3::ApiVersion::V3); @@ -1176,6 +1178,37 @@ TEST_P(AdsClusterV3Test, ClusterUpdateWhenWarming) { test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); } +// Two cluster warming, update one of them. Verify that the clusters are eventually initialized. +TEST_P(AdsClusterV3Test, ClusterInitializationUpdateOneOfThe2Warming) { + initialize(); + const auto cds_type_url = Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3); + const auto eds_type_url = Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3); + + EXPECT_TRUE(compareDiscoveryRequest(cds_type_url, "", {}, {}, {}, true)); + sendDiscoveryResponse( + cds_type_url, {buildCluster("cluster_0"), buildCluster("cluster_1")}, + {buildCluster("cluster_0"), buildCluster("cluster_1")}, {}, "1", false); + + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 2); + + // Update lb policy to MAGLEV so that cluster update is not skipped due to the same hash. + sendDiscoveryResponse( + cds_type_url, {buildCluster("cluster_0", "MAGLEV"), buildCluster("cluster_1")}, + {buildCluster("cluster_0", "MAGLEV"), buildCluster("cluster_1")}, {}, "2", false); + EXPECT_TRUE(compareDiscoveryRequest(eds_type_url, "", {"cluster_0", "cluster_1"}, + {"cluster_0", "cluster_1"}, {})); + sendDiscoveryResponse( + eds_type_url, + {buildClusterLoadAssignment("cluster_0"), buildClusterLoadAssignment("cluster_1")}, + {buildClusterLoadAssignment("cluster_0"), buildClusterLoadAssignment("cluster_1")}, {}, "1", + false); + + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 3); +} + // Verify CDS is paused during cluster warming. TEST_P(AdsClusterV3Test, CdsPausedDuringWarming) { initialize(); diff --git a/test/integration/base_integration_test.cc b/test/integration/base_integration_test.cc index 4a2623f43ae4..73bf987fe77f 100644 --- a/test/integration/base_integration_test.cc +++ b/test/integration/base_integration_test.cc @@ -415,6 +415,22 @@ AssertionResult BaseIntegrationTest::compareDiscoveryRequest( } } +AssertionResult compareSets(const std::set& set1, const std::set& set2, + absl::string_view name) { + if (set1 == set2) { + return AssertionSuccess(); + } + auto failure = AssertionFailure() << name << " field not as expected.\nExpected: {"; + for (const auto& x : set1) { + failure << x << ", "; + } + failure << "}\nActual: {"; + for (const auto& x : set2) { + failure << x << ", "; + } + return failure << "}"; +} + AssertionResult BaseIntegrationTest::compareSotwDiscoveryRequest( const std::string& expected_type_url, const std::string& expected_version, const std::vector& expected_resource_names, bool expect_node, @@ -441,12 +457,12 @@ AssertionResult BaseIntegrationTest::compareSotwDiscoveryRequest( } EXPECT_TRUE( IsSubstring("", "", expected_error_substring, discovery_request.error_detail().message())); - const std::vector resource_names(discovery_request.resource_names().cbegin(), - discovery_request.resource_names().cend()); - if (expected_resource_names != resource_names) { - return AssertionFailure() << fmt::format( - "resources {} do not match expected {} in {}", absl::StrJoin(resource_names, ","), - absl::StrJoin(expected_resource_names, ","), discovery_request.DebugString()); + const std::set resource_names_in_request(discovery_request.resource_names().cbegin(), + discovery_request.resource_names().cend()); + if (auto resource_name_result = compareSets( + std::set(expected_resource_names.cbegin(), expected_resource_names.cend()), + resource_names_in_request, "Sotw resource names")) { + return resource_name_result; } if (expected_version != discovery_request.version_info()) { return AssertionFailure() << fmt::format("version {} does not match expected {} in {}", @@ -456,22 +472,6 @@ AssertionResult BaseIntegrationTest::compareSotwDiscoveryRequest( return AssertionSuccess(); } -AssertionResult compareSets(const std::set& set1, const std::set& set2, - absl::string_view name) { - if (set1 == set2) { - return AssertionSuccess(); - } - auto failure = AssertionFailure() << name << " field not as expected.\nExpected: {"; - for (const auto& x : set1) { - failure << x << ", "; - } - failure << "}\nActual: {"; - for (const auto& x : set2) { - failure << x << ", "; - } - return failure << "}"; -} - AssertionResult BaseIntegrationTest::waitForPortAvailable(uint32_t port, std::chrono::milliseconds timeout) { Event::TestTimeSystem::RealTimeBound bound(timeout); From c04f9da0eddf6eab4debd38d05aff7b6197df14e Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Tue, 3 Nov 2020 21:55:14 +0000 Subject: [PATCH 05/18] remove debug stub Signed-off-by: Yuchen Dai --- source/common/upstream/cluster_manager_impl.cc | 2 -- source/common/upstream/eds.h | 6 +----- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index f9cee4aa272e..c09eb64269f9 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -613,8 +613,6 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cl if (existing_active_cluster != active_clusters_.end() || existing_warming_cluster != warming_clusters_.end()) { if (existing_active_cluster != active_clusters_.end()) { - ENVOY_LOG_MISC(debug, "lambdai: calling remove cluster {} ", - existing_active_cluster->second->cluster_->info()->name()); // The following init manager remove call is a NOP in the case we are already initialized. // It's just kept here to avoid additional logic. init_helper_.removeCluster(*existing_active_cluster->second->cluster_); diff --git a/source/common/upstream/eds.h b/source/common/upstream/eds.h index b5da7d732422..4ab24c38788a 100644 --- a/source/common/upstream/eds.h +++ b/source/common/upstream/eds.h @@ -34,11 +34,7 @@ class EdsClusterImpl EdsClusterImpl(const envoy::config::cluster::v3::Cluster& cluster, Runtime::Loader& runtime, Server::Configuration::TransportSocketFactoryContextImpl& factory_context, Stats::ScopePtr&& stats_scope, bool added_via_api); - ~EdsClusterImpl() override { - ENVOY_LOG_MISC(debug, "lambdai: destroy"); - probePoint(); - } - void probePoint() { ENVOY_LOG_MISC(debug, "lambdai: probe"); } + // Upstream::Cluster InitializePhase initializePhase() const override { return initialize_phase_; } From 8b1f033c73a4f06aa03097e5780d5ecd663e6352 Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Wed, 4 Nov 2020 00:50:29 +0000 Subject: [PATCH 06/18] const Signed-off-by: Yuchen Dai --- source/common/upstream/cluster_manager_impl.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index c09eb64269f9..8b69bc0dad0a 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -636,7 +636,7 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cl init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized; // Preserve the previous cluster data to avoid early destroy. The same cluster should be added // before destroy to avoid early initialization complete. - auto previous_cluster_guard = loadCluster(cluster, version_info, true, warming_clusters_); + const auto previous_cluster_guard = loadCluster(cluster, version_info, true, warming_clusters_); auto& cluster_entry = warming_clusters_.at(cluster_name); if (!all_clusters_initialized) { ENVOY_LOG(debug, "add/update cluster {} during init", cluster_name); From 533fb4482fae6f59d4cfe5368523318768393f4d Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Wed, 4 Nov 2020 00:50:29 +0000 Subject: [PATCH 07/18] const Signed-off-by: Yuchen Dai --- source/common/upstream/cluster_manager_impl.cc | 15 +++++++++------ source/common/upstream/cluster_manager_impl.h | 10 +++++----- 2 files changed, 14 insertions(+), 11 deletions(-) diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index c09eb64269f9..cc96f588e65d 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -67,6 +67,9 @@ void ClusterManagerInitHelper::addCluster(Cluster& cluster) { cluster.initialize(initialize_cb); } else { ASSERT(cluster.initializePhase() == Cluster::InitializePhase::Secondary); + // Secondary clusters can be updated. Remove the previous cluster before the cluster object is + // destroyed. Note that only secondary cluster need this remove since primary clusters are + // immutable. secondary_init_clusters_.remove_if( [name_to_remove = cluster.info()->name()](Cluster* cluster_iter) { return cluster_iter->info()->name() == name_to_remove; @@ -636,7 +639,7 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cl init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized; // Preserve the previous cluster data to avoid early destroy. The same cluster should be added // before destroy to avoid early initialization complete. - auto previous_cluster_guard = loadCluster(cluster, version_info, true, warming_clusters_); + const auto previous_cluster = loadCluster(cluster, version_info, true, warming_clusters_); auto& cluster_entry = warming_clusters_.at(cluster_name); if (!all_clusters_initialized) { ENVOY_LOG(debug, "add/update cluster {} during init", cluster_name); @@ -736,7 +739,7 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) { return removed; } -absl::optional +ClusterManagerImpl::ClusterDataPtr ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& cluster, const std::string& version_info, bool added_via_api, ClusterMap& cluster_map) { @@ -784,12 +787,12 @@ ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& clust } }); } - absl::optional result; + ClusterDataPtr result; auto cluster_entry_it = cluster_map.find(cluster_reference.info()->name()); if (cluster_entry_it != cluster_map.end()) { - result = absl::make_optional(std::move(cluster_entry_it->second)); - cluster_entry_it->second = std::make_unique(cluster, version_info, added_via_api, - std::move(new_cluster), time_source_); + result = std::exchange(cluster_entry_it->second, + std::make_unique(cluster, version_info, added_via_api, + std::move(new_cluster), time_source_)); } else { bool inserted = false; std::tie(cluster_entry_it, inserted) = diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index 469fd9e02ab3..c87c6652484c 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -479,12 +479,12 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable contains the previous cluster in the cluster_map, or - * nullopt if cluster_map does not contain the same cluster. + * @return ClusterDataPtr contains the previous cluster in the cluster_map, or + * nullptr if cluster_map did not contain the same cluster. */ - absl::optional loadCluster(const envoy::config::cluster::v3::Cluster& cluster, - const std::string& version_info, bool added_via_api, - ClusterMap& cluster_map); + ClusterDataPtr loadCluster(const envoy::config::cluster::v3::Cluster& cluster, + const std::string& version_info, bool added_via_api, + ClusterMap& cluster_map); void onClusterInit(Cluster& cluster); void postThreadLocalHealthFailure(const HostSharedPtr& host); void updateClusterCounts(); From 07250bb2b83711f7ff1cf2856721f6609d3a083d Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Wed, 4 Nov 2020 21:28:09 +0000 Subject: [PATCH 08/18] primary cluster dedup Signed-off-by: Yuchen Dai --- source/common/upstream/cluster_manager_impl.cc | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index cc96f588e65d..c4210473f15f 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -63,13 +63,16 @@ void ClusterManagerInitHelper::addCluster(Cluster& cluster) { const auto initialize_cb = [&cluster, this] { onClusterInit(cluster); }; if (cluster.initializePhase() == Cluster::InitializePhase::Primary) { + // Remove the previous cluster before the cluster object is destroyed. + primary_init_clusters_.remove_if( + [name_to_remove = cluster.info()->name()](Cluster* cluster_iter) { + return cluster_iter->info()->name() == name_to_remove; + }); primary_init_clusters_.push_back(&cluster); cluster.initialize(initialize_cb); } else { ASSERT(cluster.initializePhase() == Cluster::InitializePhase::Secondary); - // Secondary clusters can be updated. Remove the previous cluster before the cluster object is - // destroyed. Note that only secondary cluster need this remove since primary clusters are - // immutable. + // Remove the previous cluster before the cluster object is destroyed. secondary_init_clusters_.remove_if( [name_to_remove = cluster.info()->name()](Cluster* cluster_iter) { return cluster_iter->info()->name() == name_to_remove; From 125ea7240264082542d4f394a91f3d86a9b9d5c1 Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Wed, 4 Nov 2020 21:54:47 +0000 Subject: [PATCH 09/18] integration test cover primary cluster Signed-off-by: Yuchen Dai --- source/common/upstream/cluster_manager_impl.cc | 1 - test/integration/ads_integration_test.cc | 18 +++++++++++++----- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index c4210473f15f..521c4dd4a205 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -77,7 +77,6 @@ void ClusterManagerInitHelper::addCluster(Cluster& cluster) { [name_to_remove = cluster.info()->name()](Cluster* cluster_iter) { return cluster_iter->info()->name() == name_to_remove; }); - secondary_init_clusters_.push_back(&cluster); if (started_secondary_initialize_) { // This can happen if we get a second CDS update that adds new clusters after we have diff --git a/test/integration/ads_integration_test.cc b/test/integration/ads_integration_test.cc index 35f4b5d37720..4c802ca0502b 100644 --- a/test/integration/ads_integration_test.cc +++ b/test/integration/ads_integration_test.cc @@ -1188,15 +1188,23 @@ TEST_P(AdsClusterV3Test, ClusterInitializationUpdateOneOfThe2Warming) { EXPECT_TRUE(compareDiscoveryRequest(cds_type_url, "", {}, {}, {}, true)); sendDiscoveryResponse( - cds_type_url, {buildCluster("cluster_0"), buildCluster("cluster_1")}, - {buildCluster("cluster_0"), buildCluster("cluster_1")}, {}, "1", false); + cds_type_url, + {ConfigHelper::buildStaticCluster("primary_cluster", 8000, "127.0.0.1"), + buildCluster("cluster_0"), buildCluster("cluster_1")}, + {ConfigHelper::buildStaticCluster("primary_cluster", 8000, "127.0.0.1"), + buildCluster("cluster_0"), buildCluster("cluster_1")}, + {}, "1", false); test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 2); // Update lb policy to MAGLEV so that cluster update is not skipped due to the same hash. sendDiscoveryResponse( - cds_type_url, {buildCluster("cluster_0", "MAGLEV"), buildCluster("cluster_1")}, - {buildCluster("cluster_0", "MAGLEV"), buildCluster("cluster_1")}, {}, "2", false); + cds_type_url, + {ConfigHelper::buildStaticCluster("primary_cluster", 8000, "127.0.0.1"), + buildCluster("cluster_0", "MAGLEV"), buildCluster("cluster_1")}, + {ConfigHelper::buildStaticCluster("primary_cluster", 8000, "127.0.0.1"), + buildCluster("cluster_0", "MAGLEV"), buildCluster("cluster_1")}, + {}, "2", false); EXPECT_TRUE(compareDiscoveryRequest(eds_type_url, "", {"cluster_0", "cluster_1"}, {"cluster_0", "cluster_1"}, {})); sendDiscoveryResponse( @@ -1206,7 +1214,7 @@ TEST_P(AdsClusterV3Test, ClusterInitializationUpdateOneOfThe2Warming) { false); test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); - test_server_->waitForGaugeGe("cluster_manager.active_clusters", 3); + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 4); } // Verify CDS is paused during cluster warming. From 7dd7bf5a8ec19a34e8708af86775df7752d8046f Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Wed, 4 Nov 2020 23:20:59 +0000 Subject: [PATCH 10/18] unit test for primary warming cluster Signed-off-by: Yuchen Dai --- .../upstream/cluster_manager_impl_test.cc | 27 +++++++++++++++++++ 1 file changed, 27 insertions(+) diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 017eb5d1eddd..c48bc082f9ed 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -3134,6 +3134,33 @@ TEST_F(ClusterManagerInitHelperTest, StaticSdsInitialize) { cluster1.initialize_callback_(); } +// Verify that primary cluster can be updated in warming state. +TEST_F(ClusterManagerInitHelperTest, TestUpdateWarming) { + InSequence s; + + auto sds = std::make_unique>(); + ON_CALL(*sds, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); + EXPECT_CALL(*sds, initialize(_)); + init_helper_.addCluster(*sds); + init_helper_.onStaticLoadComplete(); + + NiceMock updated_sds; + ON_CALL(updated_sds, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); + EXPECT_CALL(updated_sds, initialize(_)); + init_helper_.addCluster(updated_sds); + + // The override cluser is added. Manually drop the previous cluster. In production flow this is + // achieved by ClusterManagerImpl. + sds.reset(); + + ReadyWatcher primary_initialized; + init_helper_.setPrimaryClustersInitializedCb([&]() -> void { primary_initialized.ready(); }); + + EXPECT_CALL(*this, onClusterInit(Ref(updated_sds))); + EXPECT_CALL(primary_initialized, ready()); + updated_sds.initialize_callback_(); +} + TEST_F(ClusterManagerInitHelperTest, UpdateAlreadyInitialized) { InSequence s; From 395e52557ed4cf522c94da19adf330170127ebf1 Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Wed, 4 Nov 2020 23:26:50 +0000 Subject: [PATCH 11/18] typo Signed-off-by: Yuchen Dai --- test/common/upstream/cluster_manager_impl_test.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index c48bc082f9ed..654794c63620 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -3149,7 +3149,7 @@ TEST_F(ClusterManagerInitHelperTest, TestUpdateWarming) { EXPECT_CALL(updated_sds, initialize(_)); init_helper_.addCluster(updated_sds); - // The override cluser is added. Manually drop the previous cluster. In production flow this is + // The override cluster is added. Manually drop the previous cluster. In production flow this is // achieved by ClusterManagerImpl. sds.reset(); From fa0257660e64fe70376f7c3e6166aaf343e9755e Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Thu, 5 Nov 2020 00:41:12 +0000 Subject: [PATCH 12/18] allow apt-get update failure in azure pipeline per lizan Signed-off-by: Yuchen Dai --- .azure-pipelines/cleanup.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.azure-pipelines/cleanup.sh b/.azure-pipelines/cleanup.sh index 5611a7b212fc..3714f24cac1e 100755 --- a/.azure-pipelines/cleanup.sh +++ b/.azure-pipelines/cleanup.sh @@ -3,7 +3,7 @@ set -e # Temporary script to remove tools from Azure pipelines agent to create more disk space room. -sudo apt-get update -y +sudo apt-get update -y || true sudo apt-get purge -y --no-upgrade 'ghc-*' 'zulu-*-azure-jdk' 'libllvm*' 'mysql-*' 'dotnet-*' 'libgl1' \ 'adoptopenjdk-*' 'azure-cli' 'google-chrome-stable' 'firefox' 'hhvm' From b2965c3053fa618efb6c1a8b5c89069a69046add Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Thu, 5 Nov 2020 21:19:32 +0000 Subject: [PATCH 13/18] adding test Signed-off-by: Yuchen Dai --- .../common/upstream/cluster_manager_impl.cc | 2 + .../upstream/cluster_manager_impl_test.cc | 92 +++++++++++++++++++ 2 files changed, 94 insertions(+) diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 521c4dd4a205..096823ff1dbe 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -64,10 +64,12 @@ void ClusterManagerInitHelper::addCluster(Cluster& cluster) { const auto initialize_cb = [&cluster, this] { onClusterInit(cluster); }; if (cluster.initializePhase() == Cluster::InitializePhase::Primary) { // Remove the previous cluster before the cluster object is destroyed. + ENVOY_LOG_MISC(debug, "lambdai: before remove primary size = {}", primary_init_clusters_.size()); primary_init_clusters_.remove_if( [name_to_remove = cluster.info()->name()](Cluster* cluster_iter) { return cluster_iter->info()->name() == name_to_remove; }); + ENVOY_LOG_MISC(debug, "lambdai: after remove primary size = {}", primary_init_clusters_.size()); primary_init_clusters_.push_back(&cluster); cluster.initialize(initialize_cb); } else { diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 654794c63620..35227950a954 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -1235,6 +1235,98 @@ TEST_F(ClusterManagerImplTest, RemoveWarmingCluster) { EXPECT_TRUE(Mock::VerifyAndClearExpectations(cluster1.get())); } +TEST_F(ClusterManagerImplTest, TestModifyWarmingClusterDuringInitialization) { + time_system_.setSystemTime(std::chrono::milliseconds(1234567891234)); + + const std::string json = fmt::sprintf( + R"EOF( + { + "dynamic_resources": { + "cds_config": { + "api_config_source": { + "api_type": "0", + "refresh_delay": "30s", + "cluster_names": ["cds_cluster"] + } + } + }, + "static_resources": { + %s + } + } + )EOF", + clustersJson({ + defaultStaticClusterJson("cds_cluster"), + })); + + MockCdsApi* cds = new MockCdsApi(); + std::shared_ptr cds_cluster( + new NiceMock()); + cds_cluster->info_->name_ = "cds_cluster"; + + // This part tests static init. + InSequence s; + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)) + .WillOnce(Return(std::make_pair(cds_cluster, nullptr))); + ON_CALL(*cds_cluster, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); + EXPECT_CALL(factory_, createCds_()).WillOnce(Return(cds)); + EXPECT_CALL(*cds, setInitializedCb(_)); + EXPECT_CALL(*cds_cluster, initialize(_)); + + create(parseBootstrapFromV3Json(json)); + + ReadyWatcher initialized; + cluster_manager_->setInitializedCb([&]() -> void { initialized.ready(); }); + + // This part tests CDS init. + std::shared_ptr cluster3(new NiceMock()); + cluster3->info_->name_ = "cluster3"; + + const std::string no_health_check_yaml = R"EOF( + name: fake_cluster + connect_timeout: 0.250s + type: STATIC + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: fake_cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: 127.0.0.1 + port_value: 11001 + )EOF"; + + const std::string health_check_cluster_yaml = no_health_check_yaml + R"EOF( + health_checks: + - timeout: 1s + interval: 1s + unhealthy_threshold: 2 + healthy_threshold: 2 + http_health_check: + path: "/healthcheck" + )EOF"; + + Network::MockClientConnection* connection = new NiceMock(); + EXPECT_CALL(factory_.dispatcher_, + createClientConnection_( + PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:11001")), _, _, _)) + .WillOnce(Return(connection)); + EXPECT_CALL(initialized, ready()).Times(0); + + ENVOY_LOG_MISC(debug, "lambdai: adding health check cluster"); + EXPECT_TRUE(cluster_manager_->addOrUpdateCluster( + parseClusterFromV3Yaml(health_check_cluster_yaml), "heath_check")); + + ENVOY_LOG_MISC(debug, "lambdai: adding no health check cluster"); + EXPECT_CALL(initialized, ready()); + + EXPECT_TRUE(cluster_manager_->addOrUpdateCluster(parseClusterFromV3Yaml(no_health_check_yaml), + "no_heath_check")); + +} + TEST_F(ClusterManagerImplTest, ModifyWarmingCluster) { time_system_.setSystemTime(std::chrono::milliseconds(1234567891234)); create(defaultConfig()); From 7fb0b15eacb00a2308731bc91d2bca46705174a8 Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Thu, 5 Nov 2020 22:20:42 +0000 Subject: [PATCH 14/18] test complete Signed-off-by: Yuchen Dai --- .../common/upstream/cluster_manager_impl.cc | 2 - .../upstream/cluster_manager_impl_test.cc | 59 ++++++++++++------- 2 files changed, 39 insertions(+), 22 deletions(-) diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 9c84514f5f23..6eaef38a901b 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -64,12 +64,10 @@ void ClusterManagerInitHelper::addCluster(Cluster& cluster) { const auto initialize_cb = [&cluster, this] { onClusterInit(cluster); }; if (cluster.initializePhase() == Cluster::InitializePhase::Primary) { // Remove the previous cluster before the cluster object is destroyed. - ENVOY_LOG_MISC(debug, "lambdai: before remove primary size = {}", primary_init_clusters_.size()); primary_init_clusters_.remove_if( [name_to_remove = cluster.info()->name()](Cluster* cluster_iter) { return cluster_iter->info()->name() == name_to_remove; }); - ENVOY_LOG_MISC(debug, "lambdai: after remove primary size = {}", primary_init_clusters_.size()); primary_init_clusters_.push_back(&cluster); cluster.initialize(initialize_cb); } else { diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index 2deedc67f468..b6fe0e5d8fc2 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -1273,12 +1273,8 @@ TEST_F(ClusterManagerImplTest, TestModifyWarmingClusterDuringInitialization) { create(parseBootstrapFromV3Json(json)); - ReadyWatcher initialized; - cluster_manager_->setInitializedCb([&]() -> void { initialized.ready(); }); - - // This part tests CDS init. - std::shared_ptr cluster3(new NiceMock()); - cluster3->info_->name_ = "cluster3"; + ReadyWatcher cm_initialized; + cluster_manager_->setInitializedCb([&]() -> void { cm_initialized.ready(); }); const std::string no_health_check_yaml = R"EOF( name: fake_cluster @@ -1296,7 +1292,20 @@ TEST_F(ClusterManagerImplTest, TestModifyWarmingClusterDuringInitialization) { port_value: 11001 )EOF"; - const std::string health_check_cluster_yaml = no_health_check_yaml + R"EOF( + const std::string health_check_cluster_yaml = R"EOF( + name: fake_cluster + connect_timeout: 0.250s + type: STRICT_DNS + lb_policy: ROUND_ROBIN + load_assignment: + cluster_name: fake_cluster + endpoints: + - lb_endpoints: + - endpoint: + address: + socket_address: + address: foo.com + port_value: 11001 health_checks: - timeout: 1s interval: 1s @@ -1306,23 +1315,33 @@ TEST_F(ClusterManagerImplTest, TestModifyWarmingClusterDuringInitialization) { path: "/healthcheck" )EOF"; - Network::MockClientConnection* connection = new NiceMock(); - EXPECT_CALL(factory_.dispatcher_, - createClientConnection_( - PointeesEq(Network::Utility::resolveUrl("tcp://127.0.0.1:11001")), _, _, _)) - .WillOnce(Return(connection)); - EXPECT_CALL(initialized, ready()).Times(0); + { + SCOPED_TRACE("Add a primary cluster that never ready."); - ENVOY_LOG_MISC(debug, "lambdai: adding health check cluster"); - EXPECT_TRUE(cluster_manager_->addOrUpdateCluster( - parseClusterFromV3Yaml(health_check_cluster_yaml), "heath_check")); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)); + EXPECT_TRUE(cluster_manager_->addOrUpdateCluster( + parseClusterFromV3Yaml(health_check_cluster_yaml), "heath_check")); - ENVOY_LOG_MISC(debug, "lambdai: adding no health check cluster"); - EXPECT_CALL(initialized, ready()); + // Mark all the rest of the clusters ready. Now the only warming cluster is the above one. + EXPECT_CALL(cm_initialized, ready()).Times(0); + cds_cluster->initialize_callback_(); + } - EXPECT_TRUE(cluster_manager_->addOrUpdateCluster(parseClusterFromV3Yaml(no_health_check_yaml), - "no_heath_check")); + { + SCOPED_TRACE("Modify primary cluster by immediate initialized cluster"); + EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)); + EXPECT_CALL(*cds, initialize()); + EXPECT_TRUE(cluster_manager_->addOrUpdateCluster(parseClusterFromV3Yaml(no_health_check_yaml), + "no_heath_check")); + } + { + SCOPED_TRACE("All clusters are ready. Mark cds ready. Cluster manager should be initialized."); + EXPECT_CALL(cm_initialized, ready()); + cds->initialized_callback_(); + } + factory_.tls_.shutdownThread(); + cluster_manager_->shutdown(); } TEST_F(ClusterManagerImplTest, ModifyWarmingCluster) { From 43bb75df8d73fdb5975e8783235100d1c3929269 Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Thu, 5 Nov 2020 22:36:12 +0000 Subject: [PATCH 15/18] fix singlton check by executing VerifyAndClearExpectations, why? Signed-off-by: Yuchen Dai --- .../upstream/cluster_manager_impl_test.cc | 34 ++++++------------- 1 file changed, 11 insertions(+), 23 deletions(-) diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index b6fe0e5d8fc2..155e2b628857 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -1234,8 +1234,6 @@ TEST_F(ClusterManagerImplTest, RemoveWarmingCluster) { } TEST_F(ClusterManagerImplTest, TestModifyWarmingClusterDuringInitialization) { - time_system_.setSystemTime(std::chrono::milliseconds(1234567891234)); - const std::string json = fmt::sprintf( R"EOF( { @@ -1276,7 +1274,7 @@ TEST_F(ClusterManagerImplTest, TestModifyWarmingClusterDuringInitialization) { ReadyWatcher cm_initialized; cluster_manager_->setInitializedCb([&]() -> void { cm_initialized.ready(); }); - const std::string no_health_check_yaml = R"EOF( + const std::string ready_cluster_yaml = R"EOF( name: fake_cluster connect_timeout: 0.250s type: STATIC @@ -1292,7 +1290,7 @@ TEST_F(ClusterManagerImplTest, TestModifyWarmingClusterDuringInitialization) { port_value: 11001 )EOF"; - const std::string health_check_cluster_yaml = R"EOF( + const std::string warming_cluster_yaml = R"EOF( name: fake_cluster connect_timeout: 0.250s type: STRICT_DNS @@ -1305,22 +1303,14 @@ TEST_F(ClusterManagerImplTest, TestModifyWarmingClusterDuringInitialization) { address: socket_address: address: foo.com - port_value: 11001 - health_checks: - - timeout: 1s - interval: 1s - unhealthy_threshold: 2 - healthy_threshold: 2 - http_health_check: - path: "/healthcheck" + port_value: 11001 )EOF"; { - SCOPED_TRACE("Add a primary cluster that never ready."); - + SCOPED_TRACE("Add a primary cluster staying in warming."); EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)); - EXPECT_TRUE(cluster_manager_->addOrUpdateCluster( - parseClusterFromV3Yaml(health_check_cluster_yaml), "heath_check")); + EXPECT_TRUE(cluster_manager_->addOrUpdateCluster(parseClusterFromV3Yaml(warming_cluster_yaml), + "warming")); // Mark all the rest of the clusters ready. Now the only warming cluster is the above one. EXPECT_CALL(cm_initialized, ready()).Times(0); @@ -1328,20 +1318,18 @@ TEST_F(ClusterManagerImplTest, TestModifyWarmingClusterDuringInitialization) { } { - SCOPED_TRACE("Modify primary cluster by immediate initialized cluster"); + SCOPED_TRACE("Modify the only warming primary cluster to immediate ready."); EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _)); EXPECT_CALL(*cds, initialize()); - EXPECT_TRUE(cluster_manager_->addOrUpdateCluster(parseClusterFromV3Yaml(no_health_check_yaml), - "no_heath_check")); + EXPECT_TRUE( + cluster_manager_->addOrUpdateCluster(parseClusterFromV3Yaml(ready_cluster_yaml), "ready")); } { - SCOPED_TRACE("All clusters are ready. Mark cds ready. Cluster manager should be initialized."); + SCOPED_TRACE("All clusters are ready."); EXPECT_CALL(cm_initialized, ready()); cds->initialized_callback_(); } - - factory_.tls_.shutdownThread(); - cluster_manager_->shutdown(); + EXPECT_TRUE(Mock::VerifyAndClearExpectations(cds_cluster.get())); } TEST_F(ClusterManagerImplTest, ModifyWarmingCluster) { From 7f991cd8748e4427d892c7105c868f3295d31a03 Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Sat, 7 Nov 2020 00:00:01 +0000 Subject: [PATCH 16/18] add integration test Signed-off-by: Yuchen Dai --- test/integration/ads_integration_test.cc | 39 ++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/test/integration/ads_integration_test.cc b/test/integration/ads_integration_test.cc index 4c802ca0502b..de3f72aaa469 100644 --- a/test/integration/ads_integration_test.cc +++ b/test/integration/ads_integration_test.cc @@ -1178,6 +1178,45 @@ TEST_P(AdsClusterV3Test, ClusterInitializationUpdateTheOnlyWarmingCluster) { test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); } +// Primary cluster is warming during cluster initialization. Update the cluster with immediate ready +// config and verify that all the clusters are initialized. +TEST_P(AdsClusterV3Test, TestPrimaryClusterWarmClusterInitialization) { + initialize(); + const auto cds_type_url = Config::getTypeUrl( + envoy::config::core::v3::ApiVersion::V3); + auto loopback = Network::Test::getLoopbackAddressString(ipVersion()); + addFakeUpstream(FakeHttpConnection::Type::HTTP2); + auto port = fake_upstreams_.back()->localAddress()->ip()->port(); + + // This cluster will be blocked since endpoint name cannot be resolved. + auto warming_cluster = + ConfigHelper::buildStaticCluster("fake_cluster", 12345, "notexist.foo.com"); + warming_cluster.set_type(envoy::config::cluster::v3::Cluster::STRICT_DNS); + warming_cluster.set_use_tcp_for_dns_lookups(true); + auto dns_resolver = warming_cluster.mutable_dns_resolvers()->Add(); + dns_resolver->mutable_socket_address()->set_address(loopback); + dns_resolver->mutable_socket_address()->set_port_value(port); + auto active_cluster = ConfigHelper::buildStaticCluster("fake_cluster", 12346, loopback); + + EXPECT_TRUE(compareDiscoveryRequest(cds_type_url, "", {}, {}, {}, true)); + sendDiscoveryResponse(cds_type_url, {warming_cluster}, + {warming_cluster}, {}, "1", false); + + FakeRawConnectionPtr fake_upstream_connection; + ASSERT_TRUE(fake_upstreams_.back()->waitForRawConnection(fake_upstream_connection)); + + // fake_cluster is in warming. + test_server_->waitForGaugeGe("cluster_manager.warming_clusters", 1); + + // Now replace the warming cluster by the config which will turn ready immediately. + sendDiscoveryResponse(cds_type_url, {active_cluster}, + {active_cluster}, {}, "2", false); + + // All clusters are ready. + test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0); + test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2); +} + // Two cluster warming, update one of them. Verify that the clusters are eventually initialized. TEST_P(AdsClusterV3Test, ClusterInitializationUpdateOneOfThe2Warming) { initialize(); From 278b80ace6e29f82da7ae4f94f7e13310111ccc8 Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Sat, 7 Nov 2020 00:02:46 +0000 Subject: [PATCH 17/18] revert grpc client integration Signed-off-by: Yuchen Dai --- test/common/grpc/grpc_client_integration.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/common/grpc/grpc_client_integration.h b/test/common/grpc/grpc_client_integration.h index bbcc63403784..a7bd2ee4b5d7 100644 --- a/test/common/grpc/grpc_client_integration.h +++ b/test/common/grpc/grpc_client_integration.h @@ -122,7 +122,7 @@ class DeltaSotwIntegrationParamTest #define DELTA_SOTW_GRPC_CLIENT_INTEGRATION_PARAMS \ testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), \ testing::Values(Grpc::ClientType::EnvoyGrpc, Grpc::ClientType::GoogleGrpc), \ - testing::Values(Grpc::SotwOrDelta::Sotw)) + testing::Values(Grpc::SotwOrDelta::Sotw, Grpc::SotwOrDelta::Delta)) #else #define GRPC_CLIENT_INTEGRATION_PARAMS \ testing::Combine(testing::ValuesIn(TestEnvironment::getIpVersionsForTest()), \ From 9927f6ad794c127d0a6ade18633d2a83727f4989 Mon Sep 17 00:00:00 2001 From: Yuchen Dai Date: Sat, 7 Nov 2020 19:32:15 +0000 Subject: [PATCH 18/18] replace dns by tcp health check Signed-off-by: Yuchen Dai --- test/integration/ads_integration_test.cc | 26 ++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/test/integration/ads_integration_test.cc b/test/integration/ads_integration_test.cc index de3f72aaa469..053dfcbc9868 100644 --- a/test/integration/ads_integration_test.cc +++ b/test/integration/ads_integration_test.cc @@ -1189,14 +1189,24 @@ TEST_P(AdsClusterV3Test, TestPrimaryClusterWarmClusterInitialization) { auto port = fake_upstreams_.back()->localAddress()->ip()->port(); // This cluster will be blocked since endpoint name cannot be resolved. - auto warming_cluster = - ConfigHelper::buildStaticCluster("fake_cluster", 12345, "notexist.foo.com"); - warming_cluster.set_type(envoy::config::cluster::v3::Cluster::STRICT_DNS); - warming_cluster.set_use_tcp_for_dns_lookups(true); - auto dns_resolver = warming_cluster.mutable_dns_resolvers()->Add(); - dns_resolver->mutable_socket_address()->set_address(loopback); - dns_resolver->mutable_socket_address()->set_port_value(port); - auto active_cluster = ConfigHelper::buildStaticCluster("fake_cluster", 12346, loopback); + auto warming_cluster = ConfigHelper::buildStaticCluster("fake_cluster", port, loopback); + // Below endpoint accepts request but never return. The health check hangs 1 hour which covers the + // test running. + auto blocking_health_check = TestUtility::parseYaml(R"EOF( + timeout: 3600s + interval: 3600s + unhealthy_threshold: 2 + healthy_threshold: 2 + tcp_health_check: + send: + text: '01' + receive: + - text: '02' + )EOF"); + *warming_cluster.add_health_checks() = blocking_health_check; + + // Active cluster has the same name with warming cluster but has no blocking health check. + auto active_cluster = ConfigHelper::buildStaticCluster("fake_cluster", port, loopback); EXPECT_TRUE(compareDiscoveryRequest(cds_type_url, "", {}, {}, {}, true)); sendDiscoveryResponse(cds_type_url, {warming_cluster},