Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster: unstuck cluster manager when update the initializing cluster #13875

Merged
merged 20 commits into from
Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 32 additions & 9 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,20 @@ void ClusterManagerInitHelper::addCluster(Cluster& cluster) {

const auto initialize_cb = [&cluster, this] { onClusterInit(cluster); };
if (cluster.initializePhase() == Cluster::InitializePhase::Primary) {
// Remove the previous cluster before the cluster object is destroyed.
primary_init_clusters_.remove_if(
[name_to_remove = cluster.info()->name()](Cluster* cluster_iter) {
return cluster_iter->info()->name() == name_to_remove;
});
primary_init_clusters_.push_back(&cluster);
cluster.initialize(initialize_cb);
} else {
ASSERT(cluster.initializePhase() == Cluster::InitializePhase::Secondary);
// Remove the previous cluster before the cluster object is destroyed.
secondary_init_clusters_.remove_if(
[name_to_remove = cluster.info()->name()](Cluster* cluster_iter) {
return cluster_iter->info()->name() == name_to_remove;
});
secondary_init_clusters_.push_back(&cluster);
if (started_secondary_initialize_) {
// This can happen if we get a second CDS update that adds new clusters after we have
Expand Down Expand Up @@ -613,7 +623,9 @@ bool ClusterManagerImpl::addOrUpdateCluster(const envoy::config::cluster::v3::Cl
// and easy to understand.
const bool all_clusters_initialized =
init_helper_.state() == ClusterManagerInitHelper::State::AllClustersInitialized;
loadCluster(cluster, version_info, true, warming_clusters_);
// Preserve the previous cluster data to avoid early destroy. The same cluster should be added
// before destroy to avoid early initialization complete.
const auto previous_cluster = loadCluster(cluster, version_info, true, warming_clusters_);
auto& cluster_entry = warming_clusters_.at(cluster_name);
if (!all_clusters_initialized) {
ENVOY_LOG(debug, "add/update cluster {} during init", cluster_name);
Expand Down Expand Up @@ -702,9 +714,10 @@ bool ClusterManagerImpl::removeCluster(const std::string& cluster_name) {
return removed;
}

void ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& cluster,
const std::string& version_info, bool added_via_api,
ClusterMap& cluster_map) {
ClusterManagerImpl::ClusterDataPtr
ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster& cluster,
const std::string& version_info, bool added_via_api,
ClusterMap& cluster_map) {
std::pair<ClusterSharedPtr, ThreadAwareLoadBalancerPtr> new_cluster_pair =
factory_.clusterFromProto(cluster, *this, outlier_event_logger_, added_via_api);
auto& new_cluster = new_cluster_pair.first;
Expand Down Expand Up @@ -749,11 +762,20 @@ void ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster&
}
});
}

cluster_map[cluster_reference.info()->name()] = std::make_unique<ClusterData>(
cluster, version_info, added_via_api, std::move(new_cluster), time_source_);
const auto cluster_entry_it = cluster_map.find(cluster_reference.info()->name());

ClusterDataPtr result;
auto cluster_entry_it = cluster_map.find(cluster_reference.info()->name());
if (cluster_entry_it != cluster_map.end()) {
result = std::exchange(cluster_entry_it->second,
std::make_unique<ClusterData>(cluster, version_info, added_via_api,
std::move(new_cluster), time_source_));
} else {
bool inserted = false;
std::tie(cluster_entry_it, inserted) =
cluster_map.emplace(cluster_reference.info()->name(),
std::make_unique<ClusterData>(cluster, version_info, added_via_api,
std::move(new_cluster), time_source_));
ASSERT(inserted);
}
// If an LB is thread aware, create it here. The LB is not initialized until cluster pre-init
// finishes. For RingHash/Maglev don't create the LB here if subset balancing is enabled,
// because the thread_aware_lb_ field takes precedence over the subset lb).
Expand All @@ -776,6 +798,7 @@ void ClusterManagerImpl::loadCluster(const envoy::config::cluster::v3::Cluster&
}

updateClusterCounts();
return result;
}

void ClusterManagerImpl::updateClusterCounts() {
Expand Down
10 changes: 8 additions & 2 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -477,8 +477,14 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
void createOrUpdateThreadLocalCluster(ClusterData& cluster);
ProtobufTypes::MessagePtr dumpClusterConfigs();
static ClusterManagerStats generateStats(Stats::Scope& scope);
void loadCluster(const envoy::config::cluster::v3::Cluster& cluster,
const std::string& version_info, bool added_via_api, ClusterMap& cluster_map);

/**
* @return ClusterDataPtr contains the previous cluster in the cluster_map, or
* nullptr if cluster_map did not contain the same cluster.
*/
ClusterDataPtr loadCluster(const envoy::config::cluster::v3::Cluster& cluster,
const std::string& version_info, bool added_via_api,
ClusterMap& cluster_map);
void onClusterInit(Cluster& cluster);
void postThreadLocalHealthFailure(const HostSharedPtr& host);
void updateClusterCounts();
Expand Down
129 changes: 129 additions & 0 deletions test/common/upstream/cluster_manager_impl_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1233,6 +1233,105 @@ TEST_F(ClusterManagerImplTest, RemoveWarmingCluster) {
EXPECT_TRUE(Mock::VerifyAndClearExpectations(cluster1.get()));
}

TEST_F(ClusterManagerImplTest, TestModifyWarmingClusterDuringInitialization) {
const std::string json = fmt::sprintf(
R"EOF(
{
"dynamic_resources": {
"cds_config": {
"api_config_source": {
"api_type": "0",
"refresh_delay": "30s",
"cluster_names": ["cds_cluster"]
}
}
},
"static_resources": {
%s
}
}
)EOF",
clustersJson({
defaultStaticClusterJson("cds_cluster"),
}));

MockCdsApi* cds = new MockCdsApi();
std::shared_ptr<MockClusterMockPrioritySet> cds_cluster(
new NiceMock<MockClusterMockPrioritySet>());
cds_cluster->info_->name_ = "cds_cluster";

// This part tests static init.
InSequence s;
EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _))
.WillOnce(Return(std::make_pair(cds_cluster, nullptr)));
ON_CALL(*cds_cluster, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary));
EXPECT_CALL(factory_, createCds_()).WillOnce(Return(cds));
EXPECT_CALL(*cds, setInitializedCb(_));
EXPECT_CALL(*cds_cluster, initialize(_));

create(parseBootstrapFromV3Json(json));

ReadyWatcher cm_initialized;
cluster_manager_->setInitializedCb([&]() -> void { cm_initialized.ready(); });

const std::string ready_cluster_yaml = R"EOF(
name: fake_cluster
connect_timeout: 0.250s
type: STATIC
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: fake_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: 127.0.0.1
port_value: 11001
)EOF";

const std::string warming_cluster_yaml = R"EOF(
name: fake_cluster
connect_timeout: 0.250s
type: STRICT_DNS
lb_policy: ROUND_ROBIN
load_assignment:
cluster_name: fake_cluster
endpoints:
- lb_endpoints:
- endpoint:
address:
socket_address:
address: foo.com
port_value: 11001
)EOF";

{
SCOPED_TRACE("Add a primary cluster staying in warming.");
EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _));
EXPECT_TRUE(cluster_manager_->addOrUpdateCluster(parseClusterFromV3Yaml(warming_cluster_yaml),
"warming"));

// Mark all the rest of the clusters ready. Now the only warming cluster is the above one.
EXPECT_CALL(cm_initialized, ready()).Times(0);
cds_cluster->initialize_callback_();
}

{
SCOPED_TRACE("Modify the only warming primary cluster to immediate ready.");
EXPECT_CALL(factory_, clusterFromProto_(_, _, _, _));
EXPECT_CALL(*cds, initialize());
EXPECT_TRUE(
cluster_manager_->addOrUpdateCluster(parseClusterFromV3Yaml(ready_cluster_yaml), "ready"));
}
{
SCOPED_TRACE("All clusters are ready.");
EXPECT_CALL(cm_initialized, ready());
cds->initialized_callback_();
}
EXPECT_TRUE(Mock::VerifyAndClearExpectations(cds_cluster.get()));
}

TEST_F(ClusterManagerImplTest, ModifyWarmingCluster) {
time_system_.setSystemTime(std::chrono::milliseconds(1234567891234));
create(defaultConfig());
Expand Down Expand Up @@ -2984,6 +3083,33 @@ TEST_F(ClusterManagerInitHelperTest, StaticSdsInitialize) {
cluster1.initialize_callback_();
}

// Verify that primary cluster can be updated in warming state.
TEST_F(ClusterManagerInitHelperTest, TestUpdateWarming) {
InSequence s;

auto sds = std::make_unique<NiceMock<MockClusterMockPrioritySet>>();
ON_CALL(*sds, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary));
EXPECT_CALL(*sds, initialize(_));
init_helper_.addCluster(*sds);
init_helper_.onStaticLoadComplete();

NiceMock<MockClusterMockPrioritySet> updated_sds;
ON_CALL(updated_sds, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary));
EXPECT_CALL(updated_sds, initialize(_));
init_helper_.addCluster(updated_sds);

// The override cluster is added. Manually drop the previous cluster. In production flow this is
// achieved by ClusterManagerImpl.
sds.reset();
Comment on lines +3101 to +3103
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to add a cluster manager test of this case that uses "real" DNS clusters? I think the main sequence that can actually happen here is:

  1. CDS adds DNS cluster
  2. DNS cluster begins init
  3. CDS updates DNS cluster?

I think you could also pretty easily test this in your integration test below by doing:

  1. Have CDS deliver a DNS cluster configured with health checking.
  2. DNS cluster won't initialize pending health checking.
  3. Update DNS cluster

?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the DNS with health check!

After a quick search I didn't find an existing example but this test case
TEST_F(ClusterManagerImplTest, RemoveWarmingCluster) seem putting a defaultStaticCluster("fake_cluster") in warm.

Writing a test

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A slight difference is that DNS cluster is always depending on resolver.

I modified the cluster to STATIC type to make it immediate ready.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks this all looks good, but can you add a real integration test for this in ADS integration test? I think it's really easy based on what you already have. Just use a static cluster with health checking, it will be stuck initializing, then you can update it?

/wait

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had the impression that static cluster with bad health check doesn't block initialization...
Yeah, adding integrate test to confirm initialization and the behavior health check behavior. Will udpate

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added.

Also confirmed that failed health check in static cluster also turn the warm cluster to ready.
And STRICT DNS resolve failure has the same scary side effect.

I end up not accept the tcp connection but not write back to hold the DNS request.


ReadyWatcher primary_initialized;
init_helper_.setPrimaryClustersInitializedCb([&]() -> void { primary_initialized.ready(); });

EXPECT_CALL(*this, onClusterInit(Ref(updated_sds)));
EXPECT_CALL(primary_initialized, ready());
updated_sds.initialize_callback_();
}

TEST_F(ClusterManagerInitHelperTest, UpdateAlreadyInitialized) {
InSequence s;

Expand Down Expand Up @@ -3087,6 +3213,7 @@ TEST_F(ClusterManagerInitHelperTest, AddSecondaryAfterSecondaryInit) {
init_helper_.addCluster(cluster1);

NiceMock<MockClusterMockPrioritySet> cluster2;
cluster2.info_->name_ = "cluster2";
ON_CALL(cluster2, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary));
init_helper_.addCluster(cluster2);

Expand All @@ -3099,6 +3226,8 @@ TEST_F(ClusterManagerInitHelperTest, AddSecondaryAfterSecondaryInit) {
init_helper_.startInitializingSecondaryClusters();

NiceMock<MockClusterMockPrioritySet> cluster3;
cluster3.info_->name_ = "cluster3";

ON_CALL(cluster3, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Secondary));
EXPECT_CALL(cluster3, initialize(_));
init_helper_.addCluster(cluster3);
Expand Down
5 changes: 3 additions & 2 deletions test/integration/ads_integration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ AdsIntegrationTest::AdsIntegrationTest(const envoy::config::core::v3::ApiVersion

void AdsIntegrationTest::TearDown() { cleanUpXdsConnection(); }

envoy::config::cluster::v3::Cluster AdsIntegrationTest::buildCluster(const std::string& name) {
return ConfigHelper::buildCluster(name, "ROUND_ROBIN", api_version_);
envoy::config::cluster::v3::Cluster AdsIntegrationTest::buildCluster(const std::string& name,
const std::string& lb_policy) {
return ConfigHelper::buildCluster(name, lb_policy, api_version_);
}

envoy::config::cluster::v3::Cluster AdsIntegrationTest::buildTlsCluster(const std::string& name) {
Expand Down
3 changes: 2 additions & 1 deletion test/integration/ads_integration.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ class AdsIntegrationTest : public Grpc::DeltaSotwIntegrationParamTest, public Ht

void TearDown() override;

envoy::config::cluster::v3::Cluster buildCluster(const std::string& name);
envoy::config::cluster::v3::Cluster buildCluster(const std::string& name,
const std::string& lb_policy = "ROUND_ROBIN");

envoy::config::cluster::v3::Cluster buildTlsCluster(const std::string& name);

Expand Down
114 changes: 114 additions & 0 deletions test/integration/ads_integration_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1152,6 +1152,120 @@ TEST_P(AdsClusterV3Test, BasicClusterInitialWarming) {
test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2);
}

// Update the only warming cluster. Verify that the new cluster is still warming and the cluster
// manager as a whole is not initialized.
TEST_P(AdsClusterV3Test, ClusterInitializationUpdateTheOnlyWarmingCluster) {
initialize();
const auto cds_type_url = Config::getTypeUrl<envoy::config::cluster::v3::Cluster>(
envoy::config::core::v3::ApiVersion::V3);
const auto eds_type_url = Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>(
envoy::config::core::v3::ApiVersion::V3);

EXPECT_TRUE(compareDiscoveryRequest(cds_type_url, "", {}, {}, {}, true));
sendDiscoveryResponse<envoy::config::cluster::v3::Cluster>(
cds_type_url, {buildCluster("cluster_0")}, {buildCluster("cluster_0")}, {}, "1", false);
test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 1);
// Update lb policy to MAGLEV so that cluster update is not skipped due to the same hash.
sendDiscoveryResponse<envoy::config::cluster::v3::Cluster>(
cds_type_url, {buildCluster("cluster_0", "MAGLEV")}, {buildCluster("cluster_0", "MAGLEV")},
{}, "2", false);
EXPECT_TRUE(compareDiscoveryRequest(eds_type_url, "", {"cluster_0"}, {"cluster_0"}, {}));
sendDiscoveryResponse<envoy::config::endpoint::v3::ClusterLoadAssignment>(
eds_type_url, {buildClusterLoadAssignment("cluster_0")},
{buildClusterLoadAssignment("cluster_0")}, {}, "1", false);

test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0);
test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2);
}

// Primary cluster is warming during cluster initialization. Update the cluster with immediate ready
// config and verify that all the clusters are initialized.
TEST_P(AdsClusterV3Test, TestPrimaryClusterWarmClusterInitialization) {
initialize();
const auto cds_type_url = Config::getTypeUrl<envoy::config::cluster::v3::Cluster>(
envoy::config::core::v3::ApiVersion::V3);
auto loopback = Network::Test::getLoopbackAddressString(ipVersion());
addFakeUpstream(FakeHttpConnection::Type::HTTP2);
auto port = fake_upstreams_.back()->localAddress()->ip()->port();

// This cluster will be blocked since endpoint name cannot be resolved.
auto warming_cluster = ConfigHelper::buildStaticCluster("fake_cluster", port, loopback);
// Below endpoint accepts request but never return. The health check hangs 1 hour which covers the
// test running.
auto blocking_health_check = TestUtility::parseYaml<envoy::config::core::v3::HealthCheck>(R"EOF(
timeout: 3600s
interval: 3600s
unhealthy_threshold: 2
healthy_threshold: 2
tcp_health_check:
send:
text: '01'
receive:
- text: '02'
)EOF");
*warming_cluster.add_health_checks() = blocking_health_check;

// Active cluster has the same name with warming cluster but has no blocking health check.
auto active_cluster = ConfigHelper::buildStaticCluster("fake_cluster", port, loopback);

EXPECT_TRUE(compareDiscoveryRequest(cds_type_url, "", {}, {}, {}, true));
sendDiscoveryResponse<envoy::config::cluster::v3::Cluster>(cds_type_url, {warming_cluster},
{warming_cluster}, {}, "1", false);

FakeRawConnectionPtr fake_upstream_connection;
ASSERT_TRUE(fake_upstreams_.back()->waitForRawConnection(fake_upstream_connection));

// fake_cluster is in warming.
test_server_->waitForGaugeGe("cluster_manager.warming_clusters", 1);

// Now replace the warming cluster by the config which will turn ready immediately.
sendDiscoveryResponse<envoy::config::cluster::v3::Cluster>(cds_type_url, {active_cluster},
{active_cluster}, {}, "2", false);

// All clusters are ready.
test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0);
test_server_->waitForGaugeGe("cluster_manager.active_clusters", 2);
}

// Two cluster warming, update one of them. Verify that the clusters are eventually initialized.
TEST_P(AdsClusterV3Test, ClusterInitializationUpdateOneOfThe2Warming) {
initialize();
const auto cds_type_url = Config::getTypeUrl<envoy::config::cluster::v3::Cluster>(
envoy::config::core::v3::ApiVersion::V3);
const auto eds_type_url = Config::getTypeUrl<envoy::config::endpoint::v3::ClusterLoadAssignment>(
envoy::config::core::v3::ApiVersion::V3);

EXPECT_TRUE(compareDiscoveryRequest(cds_type_url, "", {}, {}, {}, true));
sendDiscoveryResponse<envoy::config::cluster::v3::Cluster>(
cds_type_url,
{ConfigHelper::buildStaticCluster("primary_cluster", 8000, "127.0.0.1"),
buildCluster("cluster_0"), buildCluster("cluster_1")},
{ConfigHelper::buildStaticCluster("primary_cluster", 8000, "127.0.0.1"),
buildCluster("cluster_0"), buildCluster("cluster_1")},
{}, "1", false);

test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 2);

// Update lb policy to MAGLEV so that cluster update is not skipped due to the same hash.
sendDiscoveryResponse<envoy::config::cluster::v3::Cluster>(
cds_type_url,
{ConfigHelper::buildStaticCluster("primary_cluster", 8000, "127.0.0.1"),
buildCluster("cluster_0", "MAGLEV"), buildCluster("cluster_1")},
{ConfigHelper::buildStaticCluster("primary_cluster", 8000, "127.0.0.1"),
buildCluster("cluster_0", "MAGLEV"), buildCluster("cluster_1")},
{}, "2", false);
EXPECT_TRUE(compareDiscoveryRequest(eds_type_url, "", {"cluster_0", "cluster_1"},
{"cluster_0", "cluster_1"}, {}));
sendDiscoveryResponse<envoy::config::endpoint::v3::ClusterLoadAssignment>(
eds_type_url,
{buildClusterLoadAssignment("cluster_0"), buildClusterLoadAssignment("cluster_1")},
{buildClusterLoadAssignment("cluster_0"), buildClusterLoadAssignment("cluster_1")}, {}, "1",
false);

test_server_->waitForGaugeEq("cluster_manager.warming_clusters", 0);
test_server_->waitForGaugeGe("cluster_manager.active_clusters", 4);
}

// Verify CDS is paused during cluster warming.
TEST_P(AdsClusterV3Test, CdsPausedDuringWarming) {
initialize();
Expand Down
Loading