Skip to content

Commit

Permalink
cluster manager: initialization cleanups (#14382)
Browse files Browse the repository at this point in the history
Final follow up from #13906. This PR does:
1) Simplify the logic during startup by making thread local clusters
   only appear after a cluster has been initialized. This is now uniform
   both for bootstrap clusters as well as CDS clusters, making the logic
   simpler to follow.
2) Aggregate cluster needed fixes due to assumptions on startup
   existence of the thread local cluster. This change also
   fixes #14119
3) Make TLS mocks verify that set() is called before other functions.

Signed-off-by: Matt Klein <[email protected]>
  • Loading branch information
mattklein123 authored Dec 14, 2020
1 parent 76bcbd7 commit 0e6047b
Show file tree
Hide file tree
Showing 9 changed files with 139 additions and 131 deletions.
105 changes: 39 additions & 66 deletions source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -371,37 +371,25 @@ ClusterManagerImpl::ClusterManagerImpl(
cm_stats_.cluster_added_.add(bootstrap.static_resources().clusters().size());
updateClusterCounts();

if (local_cluster_name_ &&
(active_clusters_.find(local_cluster_name_.value()) == active_clusters_.end())) {
throw EnvoyException(
fmt::format("local cluster '{}' must be defined", local_cluster_name_.value()));
absl::optional<ThreadLocalClusterManagerImpl::LocalClusterParams> local_cluster_params;
if (local_cluster_name_) {
auto local_cluster = active_clusters_.find(local_cluster_name_.value());
if (local_cluster == active_clusters_.end()) {
throw EnvoyException(
fmt::format("local cluster '{}' must be defined", local_cluster_name_.value()));
}
local_cluster_params.emplace();
local_cluster_params->info_ = local_cluster->second->cluster().info();
local_cluster_params->load_balancer_factory_ = local_cluster->second->loadBalancerFactory();
local_cluster->second->setAddedOrUpdated();
}

// Once the initial set of static bootstrap clusters are created (including the local cluster),
// we can instantiate the thread local cluster manager.
tls_.set([this](Event::Dispatcher& dispatcher) {
return std::make_shared<ThreadLocalClusterManagerImpl>(*this, dispatcher);
tls_.set([this, local_cluster_params](Event::Dispatcher& dispatcher) {
return std::make_shared<ThreadLocalClusterManagerImpl>(*this, dispatcher, local_cluster_params);
});

// For active clusters that exist in bootstrap, post an empty thread local cluster update to
// populate them.
// TODO(mattklein123): It would be nice if we did not do this and instead all thread local cluster
// creation happened as part of the cluster init flow, however there are certain cases that depend
// on this behavior including route checking. It may be possible to fix static route checking to
// not depend on this behavior, but for now this is consistent with the way we have always done
// this so in the interest of minimal change it is not being done now.
for (auto& cluster : active_clusters_) {
// Skip posting the thread local cluster which is created as part of the thread local cluster
// manager constructor. See the TODO in that code for eventually cleaning this up.
if (local_cluster_name_ && local_cluster_name_.value() == cluster.first) {
continue;
}

// Avoid virtual call in the constructor. This only impacts tests. Remove this when fixing
// the above TODO.
postThreadLocalClusterUpdateNonVirtual(*cluster.second, ThreadLocalClusterUpdateParams());
}

// We can now potentially create the CDS API once the backing cluster exists.
if (dyn_resources.has_cds_config()) {
cds_api_ = factory_.createCds(dyn_resources.cds_config(), *this);
Expand Down Expand Up @@ -536,14 +524,11 @@ void ClusterManagerImpl::onClusterInit(ClusterManagerCluster& cm_cluster) {
params.per_priority_update_params_.emplace_back(host_set->priority(), host_set->hosts(),
HostVector{});
}
// At this point the update is posted if either there are actual updates or the cluster has
// not been added yet. The latter can only happen with dynamic cluster as static clusters are
// added immediately.
// TODO(mattklein123): Per related TODOs we will see if we can centralize all logic so that
// clusters only get added in this path and all of the special casing can be removed.
if (!params.per_priority_update_params_.empty() || !cm_cluster.addedOrUpdated()) {
postThreadLocalClusterUpdate(cm_cluster, std::move(params));
}
// NOTE: In all cases *other* than the local cluster, this is when a cluster is added/updated
// The local cluster must currently be statically defined and must exist prior to other
// clusters being added/updated. We could gate the below update on hosts being available on
// the cluster or the cluster not already existing, but the special logic is not worth it.
postThreadLocalClusterUpdate(cm_cluster, std::move(params));
}

bool ClusterManagerImpl::scheduleUpdate(ClusterManagerCluster& cluster, uint32_t priority,
Expand Down Expand Up @@ -929,21 +914,13 @@ void ClusterManagerImpl::postThreadLocalDrainConnections(const Cluster& cluster,
});
}

void ClusterManagerImpl::postThreadLocalClusterUpdateNonVirtual(
ClusterManagerCluster& cm_cluster, ThreadLocalClusterUpdateParams&& params) {
const bool is_local_cluster = local_cluster_name_.has_value() &&
local_cluster_name_.value() == cm_cluster.cluster().info()->name();
void ClusterManagerImpl::postThreadLocalClusterUpdate(ClusterManagerCluster& cm_cluster,
ThreadLocalClusterUpdateParams&& params) {
bool add_or_update_cluster = false;
if (!cm_cluster.addedOrUpdated()) {
add_or_update_cluster = true;
cm_cluster.setAddedOrUpdated();
}
if (is_local_cluster) {
// TODO(mattklein123): This is needed because of the special case of how local cluster is
// initialized in the thread local cluster manager constructor. This will all be cleaned up
// in a follow up.
add_or_update_cluster = false;
}

LoadBalancerFactorySharedPtr load_balancer_factory;
if (add_or_update_cluster) {
Expand All @@ -961,22 +938,17 @@ void ClusterManagerImpl::postThreadLocalClusterUpdateNonVirtual(
tls_.runOnAllThreads(
[info = cm_cluster.cluster().info(), params = std::move(params), add_or_update_cluster,
load_balancer_factory](OptRef<ThreadLocalClusterManagerImpl> cluster_manager) {
ThreadLocalClusterManagerImpl::ClusterEntry* new_cluster = nullptr;
if (add_or_update_cluster) {
if (cluster_manager->thread_local_clusters_.count(info->name()) > 0) {
ENVOY_LOG(debug, "updating TLS cluster {}", info->name());
} else {
ENVOY_LOG(debug, "adding TLS cluster {}", info->name());
}

auto thread_local_cluster = new ThreadLocalClusterManagerImpl::ClusterEntry(
*cluster_manager, info, load_balancer_factory);
cluster_manager->thread_local_clusters_[info->name()].reset(thread_local_cluster);
// TODO(mattklein123): It would be better if update callbacks were done after the initial
// cluster member is seeded, assuming it is. In the interest of minimal change this is
// deferred for a future change.
for (auto& cb : cluster_manager->update_callbacks_) {
cb->onClusterAddOrUpdate(*thread_local_cluster);
}
new_cluster = new ThreadLocalClusterManagerImpl::ClusterEntry(*cluster_manager, info,
load_balancer_factory);
cluster_manager->thread_local_clusters_[info->name()].reset(new_cluster);
}

for (const auto& per_priority : params.per_priority_update_params_) {
Expand All @@ -985,6 +957,12 @@ void ClusterManagerImpl::postThreadLocalClusterUpdateNonVirtual(
per_priority.locality_weights_, per_priority.hosts_added_,
per_priority.hosts_removed_, per_priority.overprovisioning_factor_);
}

if (new_cluster != nullptr) {
for (auto& cb : cluster_manager->update_callbacks_) {
cb->onClusterAddOrUpdate(*new_cluster);
}
}
});
}

Expand Down Expand Up @@ -1059,22 +1037,17 @@ ProtobufTypes::MessagePtr ClusterManagerImpl::dumpClusterConfigs() {
}

ClusterManagerImpl::ThreadLocalClusterManagerImpl::ThreadLocalClusterManagerImpl(
ClusterManagerImpl& parent, Event::Dispatcher& dispatcher)
ClusterManagerImpl& parent, Event::Dispatcher& dispatcher,
const absl::optional<LocalClusterParams>& local_cluster_params)
: parent_(parent), thread_local_dispatcher_(dispatcher) {
// If local cluster is defined then we need to initialize it first.
// TODO(mattklein123): Technically accessing active_clusters_ here is a race condition. This has
// been this way "forever" but should be fixed in a follow up.
if (parent.localClusterName()) {
ENVOY_LOG(debug, "adding TLS local cluster {}", parent.localClusterName().value());
auto& local_cluster = parent.active_clusters_.at(parent.localClusterName().value());
thread_local_clusters_[parent.localClusterName().value()] = std::make_unique<ClusterEntry>(
*this, local_cluster->cluster_->info(), local_cluster->loadBalancerFactory());
}

local_priority_set_ =
parent.localClusterName()
? &thread_local_clusters_[parent.localClusterName().value()]->priority_set_
: nullptr;
if (local_cluster_params.has_value()) {
const auto& local_cluster_name = local_cluster_params->info_->name();
ENVOY_LOG(debug, "adding TLS local cluster {}", local_cluster_name);
thread_local_clusters_[local_cluster_name] = std::make_unique<ClusterEntry>(
*this, local_cluster_params->info_, local_cluster_params->load_balancer_factory_);
local_priority_set_ = &thread_local_clusters_[local_cluster_name]->priority_set_;
}
}

ClusterManagerImpl::ThreadLocalClusterManagerImpl::~ThreadLocalClusterManagerImpl() {
Expand Down
14 changes: 8 additions & 6 deletions source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -327,9 +327,7 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
};

virtual void postThreadLocalClusterUpdate(ClusterManagerCluster& cm_cluster,
ThreadLocalClusterUpdateParams&& params) {
return postThreadLocalClusterUpdateNonVirtual(cm_cluster, std::move(params));
}
ThreadLocalClusterUpdateParams&& params);

private:
/**
Expand Down Expand Up @@ -422,7 +420,13 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u

using ClusterEntryPtr = std::unique_ptr<ClusterEntry>;

ThreadLocalClusterManagerImpl(ClusterManagerImpl& parent, Event::Dispatcher& dispatcher);
struct LocalClusterParams {
LoadBalancerFactorySharedPtr load_balancer_factory_;
ClusterInfoConstSharedPtr info_;
};

ThreadLocalClusterManagerImpl(ClusterManagerImpl& parent, Event::Dispatcher& dispatcher,
const absl::optional<LocalClusterParams>& local_cluster_params);
~ThreadLocalClusterManagerImpl() override;
void drainConnPools(const HostVector& hosts);
void drainConnPools(HostSharedPtr old_host, ConnPoolsContainer& container);
Expand Down Expand Up @@ -556,8 +560,6 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
ClusterMap& cluster_map);
void onClusterInit(ClusterManagerCluster& cluster);
void postThreadLocalHealthFailure(const HostSharedPtr& host);
void postThreadLocalClusterUpdateNonVirtual(ClusterManagerCluster& cm_cluster,
ThreadLocalClusterUpdateParams&& params);
void updateClusterCounts();
void clusterWarmingToActive(const std::string& cluster_name);
static void maybePrefetch(ThreadLocalClusterManagerImpl::ClusterEntry& cluster_entry,
Expand Down
19 changes: 12 additions & 7 deletions source/extensions/clusters/aggregate/cluster.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "extensions/clusters/aggregate/cluster.h"

#include "envoy/config/cluster/v3/cluster.pb.h"
#include "envoy/event/dispatcher.h"
#include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h"
#include "envoy/extensions/clusters/aggregate/v3/cluster.pb.validate.h"

Expand All @@ -20,7 +21,14 @@ Cluster::Cluster(const envoy::config::cluster::v3::Cluster& cluster,
: Upstream::ClusterImplBase(cluster, runtime, factory_context, std::move(stats_scope),
added_via_api, factory_context.dispatcher().timeSource()),
cluster_manager_(cluster_manager), runtime_(runtime), random_(random), tls_(tls),
clusters_(config.clusters().begin(), config.clusters().end()) {}
clusters_(config.clusters().begin(), config.clusters().end()) {
tls_.set([info = info(), &runtime, &random](Event::Dispatcher&) {
auto per_thread_load_balancer = std::make_unique<PerThreadLoadBalancer>();
per_thread_load_balancer->lb_ = std::make_unique<AggregateClusterLoadBalancer>(
info->stats(), runtime, random, info->lbConfig());
return per_thread_load_balancer;
});
}

PriorityContextPtr
Cluster::linearizePrioritySet(const std::function<bool(const std::string&)>& skip_predicate) {
Expand All @@ -39,7 +47,7 @@ Cluster::linearizePrioritySet(const std::function<bool(const std::string&)>& ski
continue;
}
auto tlc = cluster_manager_.getThreadLocalCluster(cluster);
// It is possible that the cluster doesn't exist, e.g., the cluster cloud be deleted or the
// It is possible that the cluster doesn't exist, e.g., the cluster could be deleted or the
// cluster hasn't been added by xDS.
if (tlc == nullptr) {
continue;
Expand Down Expand Up @@ -92,12 +100,9 @@ void Cluster::refresh(const std::function<bool(const std::string&)>& skip_predic
// Post the priority set to worker threads.
// TODO(mattklein123): Remove "this" capture.
tls_.runOnAllThreads([this, skip_predicate, cluster_name = this->info()->name()](
OptRef<ThreadLocal::ThreadLocalObject>) {
OptRef<PerThreadLoadBalancer> per_thread_load_balancer) {
PriorityContextPtr priority_context = linearizePrioritySet(skip_predicate);
Upstream::ThreadLocalCluster* cluster = cluster_manager_.getThreadLocalCluster(cluster_name);
ASSERT(cluster != nullptr);
dynamic_cast<AggregateClusterLoadBalancer&>(cluster->loadBalancer())
.refresh(std::move(priority_context));
per_thread_load_balancer->get().refresh(std::move(priority_context));
});
}

Expand Down
42 changes: 34 additions & 8 deletions source/extensions/clusters/aggregate/cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "envoy/config/cluster/v3/cluster.pb.h"
#include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h"
#include "envoy/extensions/clusters/aggregate/v3/cluster.pb.validate.h"
#include "envoy/thread_local/thread_local_object.h"

#include "common/upstream/cluster_factory_impl.h"
#include "common/upstream/upstream_impl.h"
Expand All @@ -28,6 +29,8 @@ struct PriorityContext {

using PriorityContextPtr = std::unique_ptr<PriorityContext>;

class AggregateClusterLoadBalancer;

class Cluster : public Upstream::ClusterImplBase, Upstream::ClusterUpdateCallbacks {
public:
Cluster(const envoy::config::cluster::v3::Cluster& cluster,
Expand All @@ -37,6 +40,24 @@ class Cluster : public Upstream::ClusterImplBase, Upstream::ClusterUpdateCallbac
Server::Configuration::TransportSocketFactoryContextImpl& factory_context,
Stats::ScopePtr&& stats_scope, ThreadLocal::SlotAllocator& tls, bool added_via_api);

struct PerThreadLoadBalancer : public ThreadLocal::ThreadLocalObject {
AggregateClusterLoadBalancer& get() {
// We can refresh before the per-worker LB is created. One of these variants should hold
// a non-null value.
if (absl::holds_alternative<std::unique_ptr<AggregateClusterLoadBalancer>>(lb_)) {
ASSERT(absl::get<std::unique_ptr<AggregateClusterLoadBalancer>>(lb_) != nullptr);
return *absl::get<std::unique_ptr<AggregateClusterLoadBalancer>>(lb_);
} else {
ASSERT(absl::get<AggregateClusterLoadBalancer*>(lb_) != nullptr);
return *absl::get<AggregateClusterLoadBalancer*>(lb_);
}
}

// For aggregate cluster the per-thread LB is only created once. We need to own it so we
// can pre-populate it before the LB is created and handed to the cluster.
absl::variant<std::unique_ptr<AggregateClusterLoadBalancer>, AggregateClusterLoadBalancer*> lb_;
};

// Upstream::Cluster
Upstream::Cluster::InitializePhase initializePhase() const override {
return Upstream::Cluster::InitializePhase::Secondary;
Expand All @@ -54,7 +75,7 @@ class Cluster : public Upstream::ClusterImplBase, Upstream::ClusterUpdateCallbac
Upstream::ClusterManager& cluster_manager_;
Runtime::Loader& runtime_;
Random::RandomGenerator& random_;
ThreadLocal::TypedSlot<> tls_;
ThreadLocal::TypedSlot<PerThreadLoadBalancer> tls_;
const std::vector<std::string> clusters_;

private:
Expand Down Expand Up @@ -139,24 +160,29 @@ struct AggregateLoadBalancerFactory : public Upstream::LoadBalancerFactory {
AggregateLoadBalancerFactory(const Cluster& cluster) : cluster_(cluster) {}
// Upstream::LoadBalancerFactory
Upstream::LoadBalancerPtr create() override {
return std::make_unique<AggregateClusterLoadBalancer>(
cluster_.info()->stats(), cluster_.runtime_, cluster_.random_, cluster_.info()->lbConfig());
// See comments in PerThreadLoadBalancer above for why the follow is done.
auto per_thread_local_balancer = cluster_.tls_.get();
ASSERT(absl::get<std::unique_ptr<AggregateClusterLoadBalancer>>(
per_thread_local_balancer->lb_) != nullptr);
auto to_return = std::move(
absl::get<std::unique_ptr<AggregateClusterLoadBalancer>>(per_thread_local_balancer->lb_));
per_thread_local_balancer->lb_ = to_return.get();
return to_return;
}

const Cluster& cluster_;
};

// Thread aware load balancer created by the main thread.
struct AggregateThreadAwareLoadBalancer : public Upstream::ThreadAwareLoadBalancer {
AggregateThreadAwareLoadBalancer(const Cluster& cluster) : cluster_(cluster) {}
AggregateThreadAwareLoadBalancer(const Cluster& cluster)
: factory_(std::make_shared<AggregateLoadBalancerFactory>(cluster)) {}

// Upstream::ThreadAwareLoadBalancer
Upstream::LoadBalancerFactorySharedPtr factory() override {
return std::make_shared<AggregateLoadBalancerFactory>(cluster_);
}
Upstream::LoadBalancerFactorySharedPtr factory() override { return factory_; }
void initialize() override {}

const Cluster& cluster_;
std::shared_ptr<AggregateLoadBalancerFactory> factory_;
};

class ClusterFactory : public Upstream::ConfigurableClusterFactoryBase<
Expand Down
Loading

0 comments on commit 0e6047b

Please sign in to comment.