diff --git a/source/common/upstream/cluster_manager_impl.cc b/source/common/upstream/cluster_manager_impl.cc index 7c9246d88b61..af026fe66b23 100644 --- a/source/common/upstream/cluster_manager_impl.cc +++ b/source/common/upstream/cluster_manager_impl.cc @@ -371,37 +371,25 @@ ClusterManagerImpl::ClusterManagerImpl( cm_stats_.cluster_added_.add(bootstrap.static_resources().clusters().size()); updateClusterCounts(); - if (local_cluster_name_ && - (active_clusters_.find(local_cluster_name_.value()) == active_clusters_.end())) { - throw EnvoyException( - fmt::format("local cluster '{}' must be defined", local_cluster_name_.value())); + absl::optional local_cluster_params; + if (local_cluster_name_) { + auto local_cluster = active_clusters_.find(local_cluster_name_.value()); + if (local_cluster == active_clusters_.end()) { + throw EnvoyException( + fmt::format("local cluster '{}' must be defined", local_cluster_name_.value())); + } + local_cluster_params.emplace(); + local_cluster_params->info_ = local_cluster->second->cluster().info(); + local_cluster_params->load_balancer_factory_ = local_cluster->second->loadBalancerFactory(); + local_cluster->second->setAddedOrUpdated(); } // Once the initial set of static bootstrap clusters are created (including the local cluster), // we can instantiate the thread local cluster manager. - tls_.set([this](Event::Dispatcher& dispatcher) { - return std::make_shared(*this, dispatcher); + tls_.set([this, local_cluster_params](Event::Dispatcher& dispatcher) { + return std::make_shared(*this, dispatcher, local_cluster_params); }); - // For active clusters that exist in bootstrap, post an empty thread local cluster update to - // populate them. - // TODO(mattklein123): It would be nice if we did not do this and instead all thread local cluster - // creation happened as part of the cluster init flow, however there are certain cases that depend - // on this behavior including route checking. It may be possible to fix static route checking to - // not depend on this behavior, but for now this is consistent with the way we have always done - // this so in the interest of minimal change it is not being done now. - for (auto& cluster : active_clusters_) { - // Skip posting the thread local cluster which is created as part of the thread local cluster - // manager constructor. See the TODO in that code for eventually cleaning this up. - if (local_cluster_name_ && local_cluster_name_.value() == cluster.first) { - continue; - } - - // Avoid virtual call in the constructor. This only impacts tests. Remove this when fixing - // the above TODO. - postThreadLocalClusterUpdateNonVirtual(*cluster.second, ThreadLocalClusterUpdateParams()); - } - // We can now potentially create the CDS API once the backing cluster exists. if (dyn_resources.has_cds_config()) { cds_api_ = factory_.createCds(dyn_resources.cds_config(), *this); @@ -536,14 +524,11 @@ void ClusterManagerImpl::onClusterInit(ClusterManagerCluster& cm_cluster) { params.per_priority_update_params_.emplace_back(host_set->priority(), host_set->hosts(), HostVector{}); } - // At this point the update is posted if either there are actual updates or the cluster has - // not been added yet. The latter can only happen with dynamic cluster as static clusters are - // added immediately. - // TODO(mattklein123): Per related TODOs we will see if we can centralize all logic so that - // clusters only get added in this path and all of the special casing can be removed. - if (!params.per_priority_update_params_.empty() || !cm_cluster.addedOrUpdated()) { - postThreadLocalClusterUpdate(cm_cluster, std::move(params)); - } + // NOTE: In all cases *other* than the local cluster, this is when a cluster is added/updated + // The local cluster must currently be statically defined and must exist prior to other + // clusters being added/updated. We could gate the below update on hosts being available on + // the cluster or the cluster not already existing, but the special logic is not worth it. + postThreadLocalClusterUpdate(cm_cluster, std::move(params)); } bool ClusterManagerImpl::scheduleUpdate(ClusterManagerCluster& cluster, uint32_t priority, @@ -929,21 +914,13 @@ void ClusterManagerImpl::postThreadLocalDrainConnections(const Cluster& cluster, }); } -void ClusterManagerImpl::postThreadLocalClusterUpdateNonVirtual( - ClusterManagerCluster& cm_cluster, ThreadLocalClusterUpdateParams&& params) { - const bool is_local_cluster = local_cluster_name_.has_value() && - local_cluster_name_.value() == cm_cluster.cluster().info()->name(); +void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_cluster, + ThreadLocalClusterUpdateParams&& params) { bool add_or_update_cluster = false; if (!cm_cluster.addedOrUpdated()) { add_or_update_cluster = true; cm_cluster.setAddedOrUpdated(); } - if (is_local_cluster) { - // TODO(mattklein123): This is needed because of the special case of how local cluster is - // initialized in the thread local cluster manager constructor. This will all be cleaned up - // in a follow up. - add_or_update_cluster = false; - } LoadBalancerFactorySharedPtr load_balancer_factory; if (add_or_update_cluster) { @@ -961,6 +938,7 @@ void ClusterManagerImpl::postThreadLocalClusterUpdateNonVirtual( tls_.runOnAllThreads( [info = cm_cluster.cluster().info(), params = std::move(params), add_or_update_cluster, load_balancer_factory](OptRef cluster_manager) { + ThreadLocalClusterManagerImpl::ClusterEntry* new_cluster = nullptr; if (add_or_update_cluster) { if (cluster_manager->thread_local_clusters_.count(info->name()) > 0) { ENVOY_LOG(debug, "updating TLS cluster {}", info->name()); @@ -968,15 +946,9 @@ void ClusterManagerImpl::postThreadLocalClusterUpdateNonVirtual( ENVOY_LOG(debug, "adding TLS cluster {}", info->name()); } - auto thread_local_cluster = new ThreadLocalClusterManagerImpl::ClusterEntry( - *cluster_manager, info, load_balancer_factory); - cluster_manager->thread_local_clusters_[info->name()].reset(thread_local_cluster); - // TODO(mattklein123): It would be better if update callbacks were done after the initial - // cluster member is seeded, assuming it is. In the interest of minimal change this is - // deferred for a future change. - for (auto& cb : cluster_manager->update_callbacks_) { - cb->onClusterAddOrUpdate(*thread_local_cluster); - } + new_cluster = new ThreadLocalClusterManagerImpl::ClusterEntry(*cluster_manager, info, + load_balancer_factory); + cluster_manager->thread_local_clusters_[info->name()].reset(new_cluster); } for (const auto& per_priority : params.per_priority_update_params_) { @@ -985,6 +957,12 @@ void ClusterManagerImpl::postThreadLocalClusterUpdateNonVirtual( per_priority.locality_weights_, per_priority.hosts_added_, per_priority.hosts_removed_, per_priority.overprovisioning_factor_); } + + if (new_cluster != nullptr) { + for (auto& cb : cluster_manager->update_callbacks_) { + cb->onClusterAddOrUpdate(*new_cluster); + } + } }); } @@ -1059,22 +1037,17 @@ ProtobufTypes::MessagePtr ClusterManagerImpl::dumpClusterConfigs() { } ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl( - ClusterManagerImpl& parent, Event::Dispatcher& dispatcher) + ClusterManagerImpl& parent, Event::Dispatcher& dispatcher, + const absl::optional& local_cluster_params) : parent_(parent), thread_local_dispatcher_(dispatcher) { // If local cluster is defined then we need to initialize it first. - // TODO(mattklein123): Technically accessing active_clusters_ here is a race condition. This has - // been this way "forever" but should be fixed in a follow up. - if (parent.localClusterName()) { - ENVOY_LOG(debug, "adding TLS local cluster {}", parent.localClusterName().value()); - auto& local_cluster = parent.active_clusters_.at(parent.localClusterName().value()); - thread_local_clusters_[parent.localClusterName().value()] = std::make_unique( - *this, local_cluster->cluster_->info(), local_cluster->loadBalancerFactory()); - } - - local_priority_set_ = - parent.localClusterName() - ? &thread_local_clusters_[parent.localClusterName().value()]->priority_set_ - : nullptr; + if (local_cluster_params.has_value()) { + const auto& local_cluster_name = local_cluster_params->info_->name(); + ENVOY_LOG(debug, "adding TLS local cluster {}", local_cluster_name); + thread_local_clusters_[local_cluster_name] = std::make_unique( + *this, local_cluster_params->info_, local_cluster_params->load_balancer_factory_); + local_priority_set_ = &thread_local_clusters_[local_cluster_name]->priority_set_; + } } ClusterManagerImpl::ThreadLocalClusterManagerImpl::~ThreadLocalClusterManagerImpl() { diff --git a/source/common/upstream/cluster_manager_impl.h b/source/common/upstream/cluster_manager_impl.h index ebbd98c55b17..e75cc5f7ebc9 100644 --- a/source/common/upstream/cluster_manager_impl.h +++ b/source/common/upstream/cluster_manager_impl.h @@ -327,9 +327,7 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable; - ThreadLocalClusterManagerImpl(ClusterManagerImpl& parent, Event::Dispatcher& dispatcher); + struct LocalClusterParams { + LoadBalancerFactorySharedPtr load_balancer_factory_; + ClusterInfoConstSharedPtr info_; + }; + + ThreadLocalClusterManagerImpl(ClusterManagerImpl& parent, Event::Dispatcher& dispatcher, + const absl::optional& local_cluster_params); ~ThreadLocalClusterManagerImpl() override; void drainConnPools(const HostVector& hosts); void drainConnPools(HostSharedPtr old_host, ConnPoolsContainer& container); @@ -556,8 +560,6 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable(); + per_thread_load_balancer->lb_ = std::make_unique( + info->stats(), runtime, random, info->lbConfig()); + return per_thread_load_balancer; + }); +} PriorityContextPtr Cluster::linearizePrioritySet(const std::function& skip_predicate) { @@ -39,7 +47,7 @@ Cluster::linearizePrioritySet(const std::function& ski continue; } auto tlc = cluster_manager_.getThreadLocalCluster(cluster); - // It is possible that the cluster doesn't exist, e.g., the cluster cloud be deleted or the + // It is possible that the cluster doesn't exist, e.g., the cluster could be deleted or the // cluster hasn't been added by xDS. if (tlc == nullptr) { continue; @@ -92,12 +100,9 @@ void Cluster::refresh(const std::function& skip_predic // Post the priority set to worker threads. // TODO(mattklein123): Remove "this" capture. tls_.runOnAllThreads([this, skip_predicate, cluster_name = this->info()->name()]( - OptRef) { + OptRef per_thread_load_balancer) { PriorityContextPtr priority_context = linearizePrioritySet(skip_predicate); - Upstream::ThreadLocalCluster* cluster = cluster_manager_.getThreadLocalCluster(cluster_name); - ASSERT(cluster != nullptr); - dynamic_cast(cluster->loadBalancer()) - .refresh(std::move(priority_context)); + per_thread_load_balancer->get().refresh(std::move(priority_context)); }); } diff --git a/source/extensions/clusters/aggregate/cluster.h b/source/extensions/clusters/aggregate/cluster.h index e5beeb46ef5a..59f74b08d876 100644 --- a/source/extensions/clusters/aggregate/cluster.h +++ b/source/extensions/clusters/aggregate/cluster.h @@ -3,6 +3,7 @@ #include "envoy/config/cluster/v3/cluster.pb.h" #include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h" #include "envoy/extensions/clusters/aggregate/v3/cluster.pb.validate.h" +#include "envoy/thread_local/thread_local_object.h" #include "common/upstream/cluster_factory_impl.h" #include "common/upstream/upstream_impl.h" @@ -28,6 +29,8 @@ struct PriorityContext { using PriorityContextPtr = std::unique_ptr; +class AggregateClusterLoadBalancer; + class Cluster : public Upstream::ClusterImplBase, Upstream::ClusterUpdateCallbacks { public: Cluster(const envoy::config::cluster::v3::Cluster& cluster, @@ -37,6 +40,24 @@ class Cluster : public Upstream::ClusterImplBase, Upstream::ClusterUpdateCallbac Server::Configuration::TransportSocketFactoryContextImpl& factory_context, Stats::ScopePtr&& stats_scope, ThreadLocal::SlotAllocator& tls, bool added_via_api); + struct PerThreadLoadBalancer : public ThreadLocal::ThreadLocalObject { + AggregateClusterLoadBalancer& get() { + // We can refresh before the per-worker LB is created. One of these variants should hold + // a non-null value. + if (absl::holds_alternative>(lb_)) { + ASSERT(absl::get>(lb_) != nullptr); + return *absl::get>(lb_); + } else { + ASSERT(absl::get(lb_) != nullptr); + return *absl::get(lb_); + } + } + + // For aggregate cluster the per-thread LB is only created once. We need to own it so we + // can pre-populate it before the LB is created and handed to the cluster. + absl::variant, AggregateClusterLoadBalancer*> lb_; + }; + // Upstream::Cluster Upstream::Cluster::InitializePhase initializePhase() const override { return Upstream::Cluster::InitializePhase::Secondary; @@ -54,7 +75,7 @@ class Cluster : public Upstream::ClusterImplBase, Upstream::ClusterUpdateCallbac Upstream::ClusterManager& cluster_manager_; Runtime::Loader& runtime_; Random::RandomGenerator& random_; - ThreadLocal::TypedSlot<> tls_; + ThreadLocal::TypedSlot tls_; const std::vector clusters_; private: @@ -139,8 +160,14 @@ struct AggregateLoadBalancerFactory : public Upstream::LoadBalancerFactory { AggregateLoadBalancerFactory(const Cluster& cluster) : cluster_(cluster) {} // Upstream::LoadBalancerFactory Upstream::LoadBalancerPtr create() override { - return std::make_unique( - cluster_.info()->stats(), cluster_.runtime_, cluster_.random_, cluster_.info()->lbConfig()); + // See comments in PerThreadLoadBalancer above for why the follow is done. + auto per_thread_local_balancer = cluster_.tls_.get(); + ASSERT(absl::get>( + per_thread_local_balancer->lb_) != nullptr); + auto to_return = std::move( + absl::get>(per_thread_local_balancer->lb_)); + per_thread_local_balancer->lb_ = to_return.get(); + return to_return; } const Cluster& cluster_; @@ -148,15 +175,14 @@ struct AggregateLoadBalancerFactory : public Upstream::LoadBalancerFactory { // Thread aware load balancer created by the main thread. struct AggregateThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBalancer { - AggregateThreadAwareLoadBalancer(const Cluster& cluster) : cluster_(cluster) {} + AggregateThreadAwareLoadBalancer(const Cluster& cluster) + : factory_(std::make_shared(cluster)) {} // Upstream::ThreadAwareLoadBalancer - Upstream::LoadBalancerFactorySharedPtr factory() override { - return std::make_shared(cluster_); - } + Upstream::LoadBalancerFactorySharedPtr factory() override { return factory_; } void initialize() override {} - const Cluster& cluster_; + std::shared_ptr factory_; }; class ClusterFactory : public Upstream::ConfigurableClusterFactoryBase< diff --git a/test/common/upstream/cluster_manager_impl_test.cc b/test/common/upstream/cluster_manager_impl_test.cc index d8f745f68fb5..a52eb1cfbb25 100644 --- a/test/common/upstream/cluster_manager_impl_test.cc +++ b/test/common/upstream/cluster_manager_impl_test.cc @@ -749,9 +749,7 @@ class ClusterManagerImplThreadAwareLbTest : public ClusterManagerImplTest { ON_CALL(*cluster1, initializePhase()).WillByDefault(Return(Cluster::InitializePhase::Primary)); create(parseBootstrapFromV3Json(json)); - EXPECT_EQ( - nullptr, - cluster_manager_->getThreadLocalCluster("cluster_0")->loadBalancer().chooseHost(nullptr)); + EXPECT_EQ(nullptr, cluster_manager_->getThreadLocalCluster("cluster_0")); cluster1->prioritySet().getMockHostSet(0)->hosts_ = { makeTestHost(cluster1->info_, "tcp://127.0.0.1:80", time_system_)}; @@ -1902,15 +1900,11 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemove) { EXPECT_CALL(*dns_resolver, resolve(_, _, _)) .WillRepeatedly(DoAll(SaveArg<2>(&dns_callback), Return(&active_dns_query))); create(parseBootstrapFromV3Yaml(yaml)); - EXPECT_FALSE(cluster_manager_->getThreadLocalCluster("cluster_1")->info()->addedViaApi()); - - // Test for no hosts returning the correct values before we have hosts. - const auto thread_local_cluster = cluster_manager_->getThreadLocalCluster("cluster_1"); - EXPECT_EQ(nullptr, thread_local_cluster->httpConnPool(ResourcePriority::Default, - Http::Protocol::Http11, nullptr)); - EXPECT_EQ(nullptr, thread_local_cluster->tcpConnPool(ResourcePriority::Default, nullptr)); - EXPECT_EQ(nullptr, thread_local_cluster->tcpConn(nullptr).connection_); - EXPECT_EQ(3UL, factory_.stats_.counter("cluster.cluster_1.upstream_cx_none_healthy").value()); + const auto all_clusters = cluster_manager_->clusters(); + EXPECT_TRUE(all_clusters.warming_clusters_.empty()); + EXPECT_EQ(all_clusters.active_clusters_.size(), 1); + EXPECT_FALSE(all_clusters.active_clusters_.at("cluster_1").get().info()->addedViaApi()); + EXPECT_EQ(nullptr, cluster_manager_->getThreadLocalCluster("cluster_1")); // Set up for an initialize callback. ReadyWatcher initialized; @@ -2052,7 +2046,11 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemoveWithTls) { EXPECT_CALL(*dns_resolver, resolve(_, _, _)) .WillRepeatedly(DoAll(SaveArg<2>(&dns_callback), Return(&active_dns_query))); create(parseBootstrapFromV3Yaml(yaml)); - EXPECT_FALSE(cluster_manager_->getThreadLocalCluster("cluster_1")->info()->addedViaApi()); + const auto all_clusters = cluster_manager_->clusters(); + EXPECT_TRUE(all_clusters.warming_clusters_.empty()); + EXPECT_EQ(all_clusters.active_clusters_.size(), 1); + EXPECT_FALSE(all_clusters.active_clusters_.at("cluster_1").get().info()->addedViaApi()); + EXPECT_EQ(nullptr, cluster_manager_->getThreadLocalCluster("cluster_1")); NiceMock example_com_context; ON_CALL(example_com_context, upstreamTransportSocketOptions()) @@ -2072,23 +2070,6 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemoveWithTls) { ON_CALL(ibm_com_context, upstreamTransportSocketOptions()) .WillByDefault(Return(std::make_shared("ibm.com"))); - // Test for no hosts returning the correct values before we have hosts. - const auto thread_local_cluster = cluster_manager_->getThreadLocalCluster("cluster_1"); - EXPECT_EQ(nullptr, thread_local_cluster->httpConnPool(ResourcePriority::Default, - Http::Protocol::Http11, nullptr)); - EXPECT_EQ(nullptr, thread_local_cluster->tcpConnPool(ResourcePriority::Default, nullptr)); - EXPECT_EQ(nullptr, thread_local_cluster->tcpConn(nullptr).connection_); - - EXPECT_EQ(nullptr, - thread_local_cluster->tcpConnPool(ResourcePriority::Default, &example_com_context)); - EXPECT_EQ(nullptr, thread_local_cluster->tcpConn(&ibm_com_context).connection_); - - EXPECT_EQ(nullptr, - thread_local_cluster->tcpConnPool(ResourcePriority::Default, &ibm_com_context)); - EXPECT_EQ(nullptr, thread_local_cluster->tcpConn(&ibm_com_context).connection_); - - EXPECT_EQ(7UL, factory_.stats_.counter("cluster.cluster_1.upstream_cx_none_healthy").value()); - // Set up for an initialize callback. ReadyWatcher initialized; cluster_manager_->setInitializedCb([&]() -> void { initialized.ready(); }); @@ -2393,7 +2374,11 @@ TEST_F(ClusterManagerImplTest, DynamicHostRemoveDefaultPriority) { EXPECT_CALL(*dns_resolver, resolve(_, _, _)) .WillRepeatedly(DoAll(SaveArg<2>(&dns_callback), Return(&active_dns_query))); create(parseBootstrapFromV3Yaml(yaml)); - EXPECT_FALSE(cluster_manager_->getThreadLocalCluster("cluster_1")->info()->addedViaApi()); + const auto all_clusters = cluster_manager_->clusters(); + EXPECT_TRUE(all_clusters.warming_clusters_.empty()); + EXPECT_EQ(all_clusters.active_clusters_.size(), 1); + EXPECT_FALSE(all_clusters.active_clusters_.at("cluster_1").get().info()->addedViaApi()); + EXPECT_EQ(nullptr, cluster_manager_->getThreadLocalCluster("cluster_1")); dns_callback(Network::DnsResolver::ResolutionStatus::Success, TestUtility::makeDnsResponse({"127.0.0.2"})); @@ -2475,7 +2460,11 @@ TEST_F(ClusterManagerImplTest, ConnPoolDestroyWithDraining) { EXPECT_CALL(*dns_resolver, resolve(_, _, _)) .WillRepeatedly(DoAll(SaveArg<2>(&dns_callback), Return(&active_dns_query))); create(parseBootstrapFromV3Yaml(yaml)); - EXPECT_FALSE(cluster_manager_->getThreadLocalCluster("cluster_1")->info()->addedViaApi()); + const auto all_clusters = cluster_manager_->clusters(); + EXPECT_TRUE(all_clusters.warming_clusters_.empty()); + EXPECT_EQ(all_clusters.active_clusters_.size(), 1); + EXPECT_FALSE(all_clusters.active_clusters_.at("cluster_1").get().info()->addedViaApi()); + EXPECT_EQ(nullptr, cluster_manager_->getThreadLocalCluster("cluster_1")); dns_callback(Network::DnsResolver::ResolutionStatus::Success, TestUtility::makeDnsResponse({"127.0.0.2"})); @@ -2531,8 +2520,10 @@ TEST_F(ClusterManagerImplTest, OriginalDstInitialization) { // Set up for an initialize callback. cluster_manager_->setInitializedCb([&]() -> void { initialized.ready(); }); - - EXPECT_FALSE(cluster_manager_->getThreadLocalCluster("cluster_1")->info()->addedViaApi()); + const auto all_clusters = cluster_manager_->clusters(); + EXPECT_TRUE(all_clusters.warming_clusters_.empty()); + EXPECT_EQ(all_clusters.active_clusters_.size(), 1); + EXPECT_FALSE(all_clusters.active_clusters_.at("cluster_1").get().info()->addedViaApi()); // Test for no hosts returning the correct values before we have hosts. EXPECT_EQ(nullptr, @@ -3867,7 +3858,10 @@ TEST_F(ClusterManagerImplTest, ConnPoolsDrainedOnHostSetChange) { ClusterUpdateCallbacksHandlePtr cb = cluster_manager_->addThreadLocalClusterUpdateCallbacks(*callbacks); - EXPECT_FALSE(cluster_manager_->getThreadLocalCluster("cluster_1")->info()->addedViaApi()); + const auto all_clusters = cluster_manager_->clusters(); + EXPECT_TRUE(all_clusters.warming_clusters_.empty()); + EXPECT_EQ(all_clusters.active_clusters_.size(), 1); + EXPECT_FALSE(all_clusters.active_clusters_.at("cluster_1").get().info()->addedViaApi()); // Verify that we get no hosts when the HostSet is empty. EXPECT_EQ(nullptr, diff --git a/test/extensions/filters/http/jwt_authn/filter_config_test.cc b/test/extensions/filters/http/jwt_authn/filter_config_test.cc index 6aae1a8e1b39..8406afd8c50d 100644 --- a/test/extensions/filters/http/jwt_authn/filter_config_test.cc +++ b/test/extensions/filters/http/jwt_authn/filter_config_test.cc @@ -168,7 +168,7 @@ TEST(HttpJwtAuthnFilterConfigTest, VerifyTLSLifetime) { NiceMock server_context; // Make sure that the thread callbacks are not invoked inline. - server_context.thread_local_.defer_data = true; + server_context.thread_local_.defer_data_ = true; { // Scope in all the things that the filter depends on, so they are destroyed as we leave the // scope. diff --git a/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc b/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc index cec880ba9289..1211390c625d 100644 --- a/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc +++ b/test/extensions/filters/network/redis_proxy/conn_pool_impl_test.cc @@ -1174,7 +1174,7 @@ TEST_F(RedisConnPoolImplTest, AskRedirectionFailure) { TEST_F(RedisConnPoolImplTest, MakeRequestAndRedirectFollowedByDelete) { cm_.initializeThreadLocalClusters({"fake_cluster"}); - tls_.defer_delete = true; + tls_.defer_delete_ = true; std::unique_ptr> store = std::make_unique>(); cluster_refresh_manager_ = diff --git a/test/extensions/tracers/lightstep/lightstep_tracer_impl_test.cc b/test/extensions/tracers/lightstep/lightstep_tracer_impl_test.cc index 239e30a85624..6513314c0ba0 100644 --- a/test/extensions/tracers/lightstep/lightstep_tracer_impl_test.cc +++ b/test/extensions/tracers/lightstep/lightstep_tracer_impl_test.cc @@ -201,7 +201,7 @@ TEST_F(LightStepDriverTest, DeferredTlsInitialization) { auto propagation_mode = Common::Ot::OpenTracingDriver::PropagationMode::TracerNative; - tls_.defer_data = true; + tls_.defer_data_ = true; cm_.initializeClusters({"fake_cluster"}, {}); ON_CALL(*cm_.active_clusters_["fake_cluster"]->info_, features()) .WillByDefault(Return(Upstream::ClusterInfo::Features::HTTP2)); diff --git a/test/mocks/thread_local/mocks.h b/test/mocks/thread_local/mocks.h index 7b3097f5e0fa..e735c543b0e1 100644 --- a/test/mocks/thread_local/mocks.h +++ b/test/mocks/thread_local/mocks.h @@ -50,25 +50,32 @@ class MockInstance : public Instance { ~SlotImpl() override { // Do not actually clear slot data during shutdown. This mimics the production code. - // The defer_delete mimics the recycle() code with Bookkeeper. - if (!parent_.shutdown_ && !parent_.defer_delete) { + // The defer_delete mimics the slot being deleted on the main thread but the update not yet + // getting to a worker. + if (!parent_.shutdown_ && !parent_.defer_delete_) { EXPECT_LT(index_, parent_.data_.size()); parent_.data_[index_].reset(); } } // ThreadLocal::Slot - ThreadLocalObjectSharedPtr get() override { return parent_.data_[index_]; } + ThreadLocalObjectSharedPtr get() override { + EXPECT_TRUE(was_set_); + return parent_.data_[index_]; + } bool currentThreadRegistered() override { return parent_.registered_; } void runOnAllThreads(const UpdateCb& cb) override { + EXPECT_TRUE(was_set_); parent_.runOnAllThreads([cb, this]() { cb(parent_.data_[index_]); }); } void runOnAllThreads(const UpdateCb& cb, const Event::PostCb& main_callback) override { + EXPECT_TRUE(was_set_); parent_.runOnAllThreads([cb, this]() { cb(parent_.data_[index_]); }, main_callback); } void set(InitializeCb cb) override { - if (parent_.defer_data) { + was_set_ = true; + if (parent_.defer_data_) { parent_.deferred_data_[index_] = cb; } else { parent_.data_[index_] = cb(parent_.dispatcher_); @@ -77,6 +84,7 @@ class MockInstance : public Instance { MockInstance& parent_; const uint32_t index_; + bool was_set_{}; // set() must be called before other functions. }; void call() { @@ -90,10 +98,10 @@ class MockInstance : public Instance { testing::NiceMock dispatcher_; std::vector data_; std::vector deferred_data_; - bool defer_data{}; + bool defer_data_{}; bool shutdown_{}; bool registered_{true}; - bool defer_delete{}; + bool defer_delete_{}; }; } // namespace ThreadLocal