-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
Copy pathcluster.cc
190 lines (165 loc) · 8.08 KB
/
cluster.cc
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
#include "extensions/clusters/aggregate/cluster.h"
#include "envoy/config/cluster/v3/cluster.pb.h"
#include "envoy/extensions/clusters/aggregate/v3/cluster.pb.h"
#include "envoy/extensions/clusters/aggregate/v3/cluster.pb.validate.h"
#include "common/common/assert.h"
namespace Envoy {
namespace Extensions {
namespace Clusters {
namespace Aggregate {
Cluster::Cluster(const envoy::config::cluster::v3::Cluster& cluster,
const envoy::extensions::clusters::aggregate::v3::ClusterConfig& config,
Upstream::ClusterManager& cluster_manager, Runtime::Loader& runtime,
Random::RandomGenerator& random,
Server::Configuration::TransportSocketFactoryContextImpl& factory_context,
Stats::ScopePtr&& stats_scope, ThreadLocal::SlotAllocator& tls, bool added_via_api)
: Upstream::ClusterImplBase(cluster, runtime, factory_context, std::move(stats_scope),
added_via_api, factory_context.dispatcher().timeSource()),
cluster_manager_(cluster_manager), runtime_(runtime), random_(random), tls_(tls),
clusters_(config.clusters().begin(), config.clusters().end()) {}
PriorityContextPtr
Cluster::linearizePrioritySet(const std::function<bool(const std::string&)>& skip_predicate) {
PriorityContextPtr priority_context = std::make_unique<PriorityContext>();
uint32_t next_priority_after_linearizing = 0;
// Linearize the priority set. e.g. for clusters [C_0, C_1, C_2] referred in aggregate cluster
// C_0 [P_0, P_1, P_2]
// C_1 [P_0, P_1]
// C_2 [P_0, P_1, P_2, P_3]
// The linearization result is:
// [C_0.P_0, C_0.P_1, C_0.P_2, C_1.P_0, C_1.P_1, C_2.P_0, C_2.P_1, C_2.P_2, C_2.P_3]
// and the traffic will be distributed among these priorities.
for (const auto& cluster : clusters_) {
if (skip_predicate(cluster)) {
continue;
}
auto tlc = cluster_manager_.getThreadLocalCluster(cluster);
// It is possible that the cluster doesn't exist, e.g., the cluster cloud be deleted or the
// cluster hasn't been added by xDS.
if (tlc == nullptr) {
continue;
}
uint32_t priority_in_current_cluster = 0;
for (const auto& host_set : tlc->prioritySet().hostSetsPerPriority()) {
if (!host_set->hosts().empty()) {
priority_context->priority_set_.updateHosts(
next_priority_after_linearizing, Upstream::HostSetImpl::updateHostsParams(*host_set),
host_set->localityWeights(), host_set->hosts(), {}, host_set->overprovisioningFactor());
priority_context->priority_to_cluster_.emplace_back(
std::make_pair(priority_in_current_cluster, tlc));
priority_context->cluster_and_priority_to_linearized_priority_[std::make_pair(
cluster, priority_in_current_cluster)] = next_priority_after_linearizing;
next_priority_after_linearizing++;
}
priority_in_current_cluster++;
}
}
return priority_context;
}
void Cluster::startPreInit() {
for (const auto& cluster : clusters_) {
auto tlc = cluster_manager_.getThreadLocalCluster(cluster);
// It is possible when initializing the cluster, the included cluster doesn't exist. e.g., the
// cluster could be added dynamically by xDS.
if (tlc == nullptr) {
continue;
}
// Add callback for clusters initialized before aggregate cluster.
tlc->prioritySet().addMemberUpdateCb(
[this, cluster](const Upstream::HostVector&, const Upstream::HostVector&) {
ENVOY_LOG(debug, "member update for cluster '{}' in aggregate cluster '{}'", cluster,
this->info()->name());
refresh();
});
}
refresh();
handle_ = cluster_manager_.addThreadLocalClusterUpdateCallbacks(*this);
onPreInitComplete();
}
void Cluster::refresh(const std::function<bool(const std::string&)>& skip_predicate) {
// Post the priority set to worker threads.
// TODO(mattklein123): Remove "this" capture.
tls_.runOnAllThreads([this, skip_predicate, cluster_name = this->info()->name()](
OptRef<ThreadLocal::ThreadLocalObject>) {
PriorityContextPtr priority_context = linearizePrioritySet(skip_predicate);
Upstream::ThreadLocalCluster* cluster = cluster_manager_.getThreadLocalCluster(cluster_name);
ASSERT(cluster != nullptr);
dynamic_cast<AggregateClusterLoadBalancer&>(cluster->loadBalancer())
.refresh(std::move(priority_context));
});
}
void Cluster::onClusterAddOrUpdate(Upstream::ThreadLocalCluster& cluster) {
if (std::find(clusters_.begin(), clusters_.end(), cluster.info()->name()) != clusters_.end()) {
ENVOY_LOG(debug, "adding or updating cluster '{}' for aggregate cluster '{}'",
cluster.info()->name(), info()->name());
refresh();
cluster.prioritySet().addMemberUpdateCb(
[this](const Upstream::HostVector&, const Upstream::HostVector&) { refresh(); });
}
}
void Cluster::onClusterRemoval(const std::string& cluster_name) {
// The onClusterRemoval callback is called before the thread local cluster is removed. There
// will be a dangling pointer to the thread local cluster if the deleted cluster is not skipped
// when we refresh the load balancer.
if (std::find(clusters_.begin(), clusters_.end(), cluster_name) != clusters_.end()) {
ENVOY_LOG(debug, "removing cluster '{}' from aggreagte cluster '{}'", cluster_name,
info()->name());
refresh([cluster_name](const std::string& c) { return cluster_name == c; });
}
}
absl::optional<uint32_t> AggregateClusterLoadBalancer::LoadBalancerImpl::hostToLinearizedPriority(
const Upstream::HostDescription& host) const {
auto it = priority_context_.cluster_and_priority_to_linearized_priority_.find(
std::make_pair(host.cluster().name(), host.priority()));
if (it != priority_context_.cluster_and_priority_to_linearized_priority_.end()) {
return it->second;
} else {
// The HostSet can change due to CDS/EDS updates between retries.
return absl::nullopt;
}
}
Upstream::HostConstSharedPtr
AggregateClusterLoadBalancer::LoadBalancerImpl::chooseHost(Upstream::LoadBalancerContext* context) {
const Upstream::HealthyAndDegradedLoad* priority_loads = nullptr;
if (context != nullptr) {
priority_loads = &context->determinePriorityLoad(
priority_set_, per_priority_load_,
[this](const auto& host) { return hostToLinearizedPriority(host); });
} else {
priority_loads = &per_priority_load_;
}
const auto priority_pair =
choosePriority(random_.random(), priority_loads->healthy_priority_load_,
priority_loads->degraded_priority_load_);
AggregateLoadBalancerContext aggregate_context(
context, priority_pair.second,
priority_context_.priority_to_cluster_[priority_pair.first].first);
Upstream::ThreadLocalCluster* cluster =
priority_context_.priority_to_cluster_[priority_pair.first].second;
return cluster->loadBalancer().chooseHost(&aggregate_context);
}
Upstream::HostConstSharedPtr
AggregateClusterLoadBalancer::chooseHost(Upstream::LoadBalancerContext* context) {
if (load_balancer_) {
return load_balancer_->chooseHost(context);
}
return nullptr;
}
std::pair<Upstream::ClusterImplBaseSharedPtr, Upstream::ThreadAwareLoadBalancerPtr>
ClusterFactory::createClusterWithConfig(
const envoy::config::cluster::v3::Cluster& cluster,
const envoy::extensions::clusters::aggregate::v3::ClusterConfig& proto_config,
Upstream::ClusterFactoryContext& context,
Server::Configuration::TransportSocketFactoryContextImpl& socket_factory_context,
Stats::ScopePtr&& stats_scope) {
auto new_cluster =
std::make_shared<Cluster>(cluster, proto_config, context.clusterManager(), context.runtime(),
context.api().randomGenerator(), socket_factory_context,
std::move(stats_scope), context.tls(), context.addedViaApi());
auto lb = std::make_unique<AggregateThreadAwareLoadBalancer>(*new_cluster);
return std::make_pair(new_cluster, std::move(lb));
}
REGISTER_FACTORY(ClusterFactory, Upstream::ClusterFactory);
} // namespace Aggregate
} // namespace Clusters
} // namespace Extensions
} // namespace Envoy