Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cluster manager: use clusters() instead of get() for main thread cluster validation #14204

Merged
merged 5 commits into from
Dec 2, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions include/envoy/upstream/cluster_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -166,14 +166,33 @@ class ClusterManager {
virtual void
initializeSecondaryClusters(const envoy::config::bootstrap::v3::Bootstrap& bootstrap) PURE;

using ClusterInfoMap = absl::node_hash_map<std::string, std::reference_wrapper<const Cluster>>;
using ClusterInfoMap = absl::flat_hash_map<std::string, std::reference_wrapper<const Cluster>>;
struct ClusterInfoMaps {
bool hasCluster(absl::string_view cluster) const {
return active_clusters_.find(cluster) != active_clusters_.end() ||
warming_clusters_.find(cluster) != warming_clusters_.end();
}

ClusterConstOptRef getCluster(absl::string_view cluster) {
auto active_cluster = active_clusters_.find(cluster);
if (active_cluster != active_clusters_.end()) {
return active_cluster->second;
}
auto warming_cluster = warming_clusters_.find(cluster);
if (warming_cluster != warming_clusters_.end()) {
return warming_cluster->second;
}
return absl::nullopt;
}

ClusterInfoMap active_clusters_;
ClusterInfoMap warming_clusters_;
};

/**
* @return ClusterInfoMap all current clusters including active and warming.
*
* NOTE: This method is only thread safe on the main thread. It should not be called elsewhere.
*/
virtual ClusterInfoMaps clusters() PURE;

Expand All @@ -195,8 +214,12 @@ class ClusterManager {
* the case of dynamic clusters, subsequent event loop iterations may invalidate this pointer.
* If information about the cluster needs to be kept, use the ThreadLocalCluster::info() method to
* obtain cluster information that is safe to store.
*
* NOTE: This method may return nullptr even if the cluster exists (if it hasn't been warmed yet,
* propagated to workers, etc.). Use clusters() for general configuration checking on the main
* thread.
*/
virtual ThreadLocalCluster* get(absl::string_view cluster) PURE;
virtual ThreadLocalCluster* getThreadLocalCluster(absl::string_view cluster) PURE;

/**
* Allocate a load balanced HTTP connection pool for a cluster. This is *per-thread* so that
Expand Down
1 change: 1 addition & 0 deletions include/envoy/upstream/upstream.h
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,7 @@ class Cluster {
};

using ClusterSharedPtr = std::shared_ptr<Cluster>;
using ClusterConstOptRef = absl::optional<std::reference_wrapper<const Cluster>>;

} // namespace Upstream
} // namespace Envoy
22 changes: 13 additions & 9 deletions source/common/config/utility.cc
Original file line number Diff line number Diff line change
Expand Up @@ -57,25 +57,29 @@ void Utility::translateApiConfigSource(
Protobuf::util::TimeUtil::MillisecondsToDuration(refresh_delay_ms));
}

void Utility::checkCluster(absl::string_view error_prefix, absl::string_view cluster_name,
Upstream::ClusterManager& cm, bool allow_added_via_api) {
Upstream::ThreadLocalCluster* cluster = cm.get(cluster_name);
if (cluster == nullptr) {
Upstream::ClusterConstOptRef Utility::checkCluster(absl::string_view error_prefix,
absl::string_view cluster_name,
Upstream::ClusterManager& cm,
bool allow_added_via_api) {
const auto cluster = cm.clusters().getCluster(cluster_name);
if (!cluster.has_value()) {
throw EnvoyException(fmt::format("{}: unknown cluster '{}'", error_prefix, cluster_name));
}

if (!allow_added_via_api && cluster->info()->addedViaApi()) {
if (!allow_added_via_api && cluster->get().info()->addedViaApi()) {
throw EnvoyException(fmt::format(
"{}: invalid cluster '{}': currently only static (non-CDS) clusters are supported",
error_prefix, cluster_name));
}
return cluster;
}

void Utility::checkClusterAndLocalInfo(absl::string_view error_prefix,
absl::string_view cluster_name, Upstream::ClusterManager& cm,
const LocalInfo::LocalInfo& local_info) {
checkCluster(error_prefix, cluster_name, cm);
Upstream::ClusterConstOptRef
Utility::checkClusterAndLocalInfo(absl::string_view error_prefix, absl::string_view cluster_name,
Upstream::ClusterManager& cm,
const LocalInfo::LocalInfo& local_info) {
checkLocalInfo(error_prefix, local_info);
return checkCluster(error_prefix, cluster_name, cm);
}

void Utility::checkLocalInfo(absl::string_view error_prefix,
Expand Down
14 changes: 9 additions & 5 deletions source/common/config/utility.h
Original file line number Diff line number Diff line change
Expand Up @@ -118,20 +118,24 @@ class Utility {
* @param cm supplies the cluster manager.
* @param allow_added_via_api indicates whether a cluster is allowed to be added via api
* rather than be a static resource from the bootstrap config.
* @return the main thread cluster if it exists.
*/
static void checkCluster(absl::string_view error_prefix, absl::string_view cluster_name,
Upstream::ClusterManager& cm, bool allow_added_via_api = false);
static Upstream::ClusterConstOptRef checkCluster(absl::string_view error_prefix,
absl::string_view cluster_name,
Upstream::ClusterManager& cm,
bool allow_added_via_api = false);

/**
* Check cluster/local info for API config sanity. Throws on error.
* @param error_prefix supplies the prefix to use in error messages.
* @param cluster_name supplies the cluster name to check.
* @param cm supplies the cluster manager.
* @param local_info supplies the local info.
* @return the main thread cluster if it exists.
*/
static void checkClusterAndLocalInfo(absl::string_view error_prefix,
absl::string_view cluster_name, Upstream::ClusterManager& cm,
const LocalInfo::LocalInfo& local_info);
static Upstream::ClusterConstOptRef
checkClusterAndLocalInfo(absl::string_view error_prefix, absl::string_view cluster_name,
Upstream::ClusterManager& cm, const LocalInfo::LocalInfo& local_info);

/**
* Check local info for API config sanity. Throws on error.
Expand Down
2 changes: 1 addition & 1 deletion source/common/grpc/async_client_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ AsyncStreamImpl::AsyncStreamImpl(AsyncClientImpl& parent, absl::string_view serv
callbacks_(callbacks), options_(options) {}

void AsyncStreamImpl::initialize(bool buffer_body_for_retry) {
if (parent_.cm_.get(parent_.remote_cluster_name_) == nullptr) {
if (parent_.cm_.getThreadLocalCluster(parent_.remote_cluster_name_) == nullptr) {
callbacks_.onRemoteClose(Status::WellKnownGrpcStatus::Unavailable, "Cluster not available");
http_reset_ = true;
return;
Expand Down
5 changes: 3 additions & 2 deletions source/common/http/conn_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1294,8 +1294,9 @@ void ConnectionManagerImpl::ActiveStream::refreshCachedRoute(const Router::Route
if (nullptr == filter_manager_.streamInfo().route_entry_) {
cached_cluster_info_ = nullptr;
} else {
Upstream::ThreadLocalCluster* local_cluster = connection_manager_.cluster_manager_.get(
filter_manager_.streamInfo().route_entry_->clusterName());
Upstream::ThreadLocalCluster* local_cluster =
connection_manager_.cluster_manager_.getThreadLocalCluster(
filter_manager_.streamInfo().route_entry_->clusterName());
cached_cluster_info_ = (nullptr == local_cluster) ? nullptr : local_cluster->info();
}

Expand Down
30 changes: 18 additions & 12 deletions source/common/router/config_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -902,7 +902,8 @@ RouteConstSharedPtr RouteEntryImplBase::clusterEntry(const Http::HeaderMap& head
true);
}

void RouteEntryImplBase::validateClusters(Upstream::ClusterManager& cm) const {
void RouteEntryImplBase::validateClusters(
const Upstream::ClusterManager::ClusterInfoMaps& cluster_info_maps) const {
if (isDirectResponse()) {
return;
}
Expand All @@ -914,12 +915,12 @@ void RouteEntryImplBase::validateClusters(Upstream::ClusterManager& cm) const {
// In the future we might decide to also have a config option that turns off checks for static
// route tables. This would enable the all CDS with static route table case.
if (!cluster_name_.empty()) {
if (!cm.get(cluster_name_)) {
if (!cluster_info_maps.hasCluster(cluster_name_)) {
throw EnvoyException(fmt::format("route: unknown cluster '{}'", cluster_name_));
}
} else if (!weighted_clusters_.empty()) {
for (const WeightedClusterEntrySharedPtr& cluster : weighted_clusters_) {
if (!cm.get(cluster->clusterName())) {
if (!cluster_info_maps.hasCluster(cluster->clusterName())) {
throw EnvoyException(
fmt::format("route: unknown weighted cluster '{}'", cluster->clusterName()));
}
Expand Down Expand Up @@ -1075,11 +1076,12 @@ RouteConstSharedPtr ConnectRouteEntryImpl::matches(const Http::RequestHeaderMap&
return nullptr;
}

VirtualHostImpl::VirtualHostImpl(const envoy::config::route::v3::VirtualHost& virtual_host,
const ConfigImpl& global_route_config,
Server::Configuration::ServerFactoryContext& factory_context,
Stats::Scope& scope, ProtobufMessage::ValidationVisitor& validator,
bool validate_clusters)
VirtualHostImpl::VirtualHostImpl(
const envoy::config::route::v3::VirtualHost& virtual_host,
const ConfigImpl& global_route_config,
Server::Configuration::ServerFactoryContext& factory_context, Stats::Scope& scope,
ProtobufMessage::ValidationVisitor& validator,
const absl::optional<Upstream::ClusterManager::ClusterInfoMaps>& validation_clusters)
: stat_name_pool_(factory_context.scope().symbolTable()),
stat_name_(stat_name_pool_.add(virtual_host.name())),
vcluster_scope_(scope.createScope(virtual_host.name() + ".vcluster")),
Expand Down Expand Up @@ -1142,11 +1144,11 @@ VirtualHostImpl::VirtualHostImpl(const envoy::config::route::v3::VirtualHost& vi
NOT_REACHED_GCOVR_EXCL_LINE;
}

if (validate_clusters) {
routes_.back()->validateClusters(factory_context.clusterManager());
if (validation_clusters.has_value()) {
routes_.back()->validateClusters(*validation_clusters);
for (const auto& shadow_policy : routes_.back()->shadowPolicies()) {
ASSERT(!shadow_policy->cluster().empty());
if (!factory_context.clusterManager().get(shadow_policy->cluster())) {
if (!validation_clusters->hasCluster(shadow_policy->cluster())) {
throw EnvoyException(
fmt::format("route: unknown shadow cluster '{}'", shadow_policy->cluster()));
}
Expand Down Expand Up @@ -1228,10 +1230,14 @@ RouteMatcher::RouteMatcher(const envoy::config::route::v3::RouteConfiguration& r
Server::Configuration::ServerFactoryContext& factory_context,
ProtobufMessage::ValidationVisitor& validator, bool validate_clusters)
: vhost_scope_(factory_context.scope().createScope("vhost")) {
absl::optional<Upstream::ClusterManager::ClusterInfoMaps> validation_clusters;
if (validate_clusters) {
validation_clusters = factory_context.clusterManager().clusters();
}
for (const auto& virtual_host_config : route_config.virtual_hosts()) {
VirtualHostSharedPtr virtual_host(new VirtualHostImpl(virtual_host_config, global_route_config,
factory_context, *vhost_scope_, validator,
validate_clusters));
validation_clusters));
for (const std::string& domain_name : virtual_host_config.domains()) {
const std::string domain = Http::LowerCaseString(domain_name).get();
bool duplicate_found = false;
Expand Down
12 changes: 7 additions & 5 deletions source/common/router/config_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,12 @@ class ConfigImpl;
*/
class VirtualHostImpl : public VirtualHost {
public:
VirtualHostImpl(const envoy::config::route::v3::VirtualHost& virtual_host,
const ConfigImpl& global_route_config,
Server::Configuration::ServerFactoryContext& factory_context, Stats::Scope& scope,
ProtobufMessage::ValidationVisitor& validator, bool validate_clusters);
VirtualHostImpl(
const envoy::config::route::v3::VirtualHost& virtual_host,
const ConfigImpl& global_route_config,
Server::Configuration::ServerFactoryContext& factory_context, Stats::Scope& scope,
ProtobufMessage::ValidationVisitor& validator,
const absl::optional<Upstream::ClusterManager::ClusterInfoMaps>& validation_clusters);

RouteConstSharedPtr getRouteFromEntries(const RouteCallback& cb,
const Http::RequestHeaderMap& headers,
Expand Down Expand Up @@ -461,7 +463,7 @@ class RouteEntryImplBase : public RouteEntry,

bool matchRoute(const Http::RequestHeaderMap& headers, const StreamInfo::StreamInfo& stream_info,
uint64_t random_value) const;
void validateClusters(Upstream::ClusterManager& cm) const;
void validateClusters(const Upstream::ClusterManager::ClusterInfoMaps& cluster_info_maps) const;

// Router::RouteEntry
const std::string& clusterName() const override;
Expand Down
3 changes: 2 additions & 1 deletion source/common/router/router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -405,7 +405,8 @@ Http::FilterHeadersStatus Filter::decodeHeaders(Http::RequestHeaderMap& headers,
route_entry_->clusterName());
};
}
Upstream::ThreadLocalCluster* cluster = config_.cm_.get(route_entry_->clusterName());
Upstream::ThreadLocalCluster* cluster =
config_.cm_.getThreadLocalCluster(route_entry_->clusterName());
if (!cluster) {
config_.stats_.no_cluster_.inc();
ENVOY_STREAM_LOG(debug, "unknown cluster '{}'", *callbacks_, route_entry_->clusterName());
Expand Down
2 changes: 1 addition & 1 deletion source/common/router/shadow_writer_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ void ShadowWriterImpl::shadow(const std::string& cluster, Http::RequestMessagePt
// It's possible that the cluster specified in the route configuration no longer exists due
// to a CDS removal. Check that it still exists before shadowing.
// TODO(mattklein123): Optimally we would have a stat but for now just fix the crashing issue.
if (!cm_.get(cluster)) {
if (!cm_.getThreadLocalCluster(cluster)) {
ENVOY_LOG(debug, "shadow cluster '{}' does not exist", cluster);
return;
}
Expand Down
4 changes: 3 additions & 1 deletion source/common/tcp_proxy/tcp_proxy.cc
Original file line number Diff line number Diff line change
Expand Up @@ -383,12 +383,14 @@ Network::FilterStatus Filter::initializeUpstreamConnection() {

const std::string& cluster_name = route_ ? route_->clusterName() : EMPTY_STRING;

Upstream::ThreadLocalCluster* thread_local_cluster = cluster_manager_.get(cluster_name);
Upstream::ThreadLocalCluster* thread_local_cluster =
cluster_manager_.getThreadLocalCluster(cluster_name);

if (thread_local_cluster) {
ENVOY_CONN_LOG(debug, "Creating connection to cluster {}", read_callbacks_->connection(),
cluster_name);
} else {
ENVOY_CONN_LOG(debug, "Cluster not found {}", read_callbacks_->connection(), cluster_name);
config_->stats().downstream_cx_no_route_.inc();
getStreamInfo().setResponseFlag(StreamInfo::ResponseFlag::NoRouteFound);
onInitFailure(UpstreamFailureReason::NoRoute);
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/cluster_manager_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -842,7 +842,7 @@ void ClusterManagerImpl::updateClusterCounts() {
cm_stats_.warming_clusters_.set(warming_clusters_.size());
}

ThreadLocalCluster* ClusterManagerImpl::get(absl::string_view cluster) {
ThreadLocalCluster* ClusterManagerImpl::getThreadLocalCluster(absl::string_view cluster) {
ThreadLocalClusterManagerImpl& cluster_manager = *tls_;

auto entry = cluster_manager.thread_local_clusters_.find(cluster);
Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/cluster_manager_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ class ClusterManagerImpl : public ClusterManager, Logger::Loggable<Logger::Id::u
}

const ClusterSet& primaryClusters() override { return primary_clusters_; }
ThreadLocalCluster* get(absl::string_view cluster) override;
ThreadLocalCluster* getThreadLocalCluster(absl::string_view cluster) override;

using ClusterManager::httpConnPoolForCluster;

Expand Down
2 changes: 1 addition & 1 deletion source/common/upstream/cluster_update_tracker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ namespace Upstream {
ClusterUpdateTracker::ClusterUpdateTracker(ClusterManager& cm, const std::string& cluster_name)
: cluster_name_(cluster_name),
cluster_update_callbacks_handle_(cm.addThreadLocalClusterUpdateCallbacks(*this)) {
Upstream::ThreadLocalCluster* cluster = cm.get(cluster_name_);
Upstream::ThreadLocalCluster* cluster = cm.getThreadLocalCluster(cluster_name_);
cluster_info_ = cluster ? cluster->info() : nullptr;
}

Expand Down
6 changes: 3 additions & 3 deletions source/extensions/clusters/aggregate/cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Cluster::linearizePrioritySet(const std::function<bool(const std::string&)>& ski
if (skip_predicate(cluster)) {
continue;
}
auto tlc = cluster_manager_.get(cluster);
auto tlc = cluster_manager_.getThreadLocalCluster(cluster);
// It is possible that the cluster doesn't exist, e.g., the cluster cloud be deleted or the
// cluster hasn't been added by xDS.
if (tlc == nullptr) {
Expand Down Expand Up @@ -67,7 +67,7 @@ Cluster::linearizePrioritySet(const std::function<bool(const std::string&)>& ski

void Cluster::startPreInit() {
for (const auto& cluster : clusters_) {
auto tlc = cluster_manager_.get(cluster);
auto tlc = cluster_manager_.getThreadLocalCluster(cluster);
// It is possible when initializing the cluster, the included cluster doesn't exist. e.g., the
// cluster could be added dynamically by xDS.
if (tlc == nullptr) {
Expand All @@ -94,7 +94,7 @@ void Cluster::refresh(const std::function<bool(const std::string&)>& skip_predic
tls_.runOnAllThreads([this, skip_predicate, cluster_name = this->info()->name()](
OptRef<ThreadLocal::ThreadLocalObject>) {
PriorityContextPtr priority_context = linearizePrioritySet(skip_predicate);
Upstream::ThreadLocalCluster* cluster = cluster_manager_.get(cluster_name);
Upstream::ThreadLocalCluster* cluster = cluster_manager_.getThreadLocalCluster(cluster_name);
ASSERT(cluster != nullptr);
dynamic_cast<AggregateClusterLoadBalancer&>(cluster->loadBalancer())
.refresh(std::move(priority_context));
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/common/wasm/context.cc
Original file line number Diff line number Diff line change
Expand Up @@ -892,7 +892,7 @@ WasmResult Context::httpCall(absl::string_view cluster, const Pairs& request_hea
return WasmResult::BadArgument;
}
auto cluster_string = std::string(cluster);
if (clusterManager().get(cluster_string) == nullptr) {
if (clusterManager().getThreadLocalCluster(cluster_string) == nullptr) {
return WasmResult::BadArgument;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ void RawHttpClientImpl::check(RequestCallbacks& callbacks,

// It's possible that the cluster specified in the filter configuration no longer exists due to a
// CDS removal.
if (cm_.get(cluster) == nullptr) {
if (cm_.getThreadLocalCluster(cluster) == nullptr) {
// TODO(dio): Add stats related to this.
ENVOY_LOG(debug, "ext_authz cluster '{}' does not exist", cluster);
callbacks_->onComplete(std::make_unique<Response>(errorResponse()));
Expand Down
2 changes: 1 addition & 1 deletion source/extensions/filters/http/common/jwks_fetcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ class JwksFetcherImpl : public JwksFetcher,

// Check if cluster is configured, fail the request if not.
// Otherwise cm_.httpAsyncClientForCluster will throw exception.
if (cm_.get(uri.cluster()) == nullptr) {
if (cm_.getThreadLocalCluster(uri.cluster()) == nullptr) {
ENVOY_LOG(error, "{}: fetch pubkey [uri = {}] failed: [cluster = {}] is not configured",
__func__, uri.uri(), uri.cluster());
complete_ = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ Http::FilterHeadersStatus ProxyFilter::decodeHeaders(Http::RequestHeaderMap& hea
return Http::FilterHeadersStatus::Continue;
}

Upstream::ThreadLocalCluster* cluster = config_->clusterManager().get(route_entry->clusterName());
Upstream::ThreadLocalCluster* cluster =
config_->clusterManager().getThreadLocalCluster(route_entry->clusterName());
if (!cluster) {
return Http::FilterHeadersStatus::Continue;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ void HealthCheckFilter::onComplete() {
details = &RcDetails::get().HealthCheckClusterHealthy;
const std::string& cluster_name = item.first;
const uint64_t min_healthy_percentage = static_cast<uint64_t>(item.second);
auto* cluster = clusterManager.get(cluster_name);
auto* cluster = clusterManager.getThreadLocalCluster(cluster_name);
if (cluster == nullptr) {
// If the cluster does not exist at all, consider the service unhealthy.
final_status = Http::Code::ServiceUnavailable;
Expand Down
Loading