-
Notifications
You must be signed in to change notification settings - Fork 29
/
Copy pathorchestrator.go
401 lines (366 loc) · 16.1 KB
/
orchestrator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
// Package orchestrator is responsible for instrumenting inbound xDS client
// requests to the correct aggregated key, forwarding a representative request
// to the upstream origin server, and managing the lifecycle of downstream and
// upstream connections and associates streams. It implements
// go-control-plane's Cache interface in order to receive xDS-based requests,
// send responses, and handle gRPC streams.
package orchestrator
import (
"context"
"fmt"
"sync"
"time"
discovery "github.com/envoyproxy/go-control-plane/envoy/api/v2"
gcp "github.com/envoyproxy/go-control-plane/pkg/cache/v2"
bootstrapv1 "github.com/envoyproxy/xds-relay/pkg/api/bootstrap/v1"
"github.com/uber-go/tally"
"github.com/envoyproxy/xds-relay/internal/app/cache"
"github.com/envoyproxy/xds-relay/internal/app/mapper"
"github.com/envoyproxy/xds-relay/internal/app/metrics"
"github.com/envoyproxy/xds-relay/internal/app/upstream"
"github.com/envoyproxy/xds-relay/internal/pkg/log"
)
const (
component = "orchestrator"
// unaggregatedPrefix is the prefix used to label discovery requests that
// could not be successfully mapped to an aggregation rule.
unaggregatedPrefix = "unaggregated_"
)
// Orchestrator has the following responsibilities:
//
// 1. Aggregates similar requests abiding by the aggregated keyer
// configurations.
// 2. Maintains long lived streams with the upstream origin server for each
// representative discovery request.
// 3. Maintains long lived streams with each downstream xDS client using
// go-control-plane's CreateWatch function.
// 4. When new responses are available upstream, orchestrator relays and fans
// out the response back on the streams associated with the xDS clients.
// 5. Updates the xds-relay cache with the latest state of the world.
//
// Orchestrator will be using go-control-plane's gRPC server implementation to
// maintain the fanout to downstream clients. go-control-plane keeps an open
// connection with each downstream xDS client. When orchestrator receives an
// upstream response from the forwarded sample request (via a long lived
// channel), Orchestrator will cache the response, and fanout to the
// downstreams by supplying responses to the individual channels corresponding
// to each downstream connection (watcher). See the CreateWatch function for
// more details.
type Orchestrator interface {
gcp.Cache
// This is called by the main shutdown handler and tests to clean up
// open channels.
shutdown(ctx context.Context)
GetReadOnlyCache() cache.ReadOnlyCache
GetDownstreamAggregatedKeys() (map[string]bool, error)
}
type orchestrator struct {
mapper mapper.Mapper
cache cache.Cache
upstreamClient upstream.Client
logger log.Logger
scope tally.Scope
downstreamResponseMap downstreamResponseMap
upstreamResponseMap upstreamResponseMap
}
// New instantiates the mapper, cache, upstream client components necessary for
// the orchestrator to operate and returns an instance of the instantiated
// orchestrator.
func New(
ctx context.Context,
logger log.Logger,
scope tally.Scope,
mapper mapper.Mapper,
upstreamClient upstream.Client,
cacheConfig *bootstrapv1.Cache,
) Orchestrator {
orchestrator := &orchestrator{
logger: logger.Named(component),
scope: scope.SubScope(metrics.ScopeOrchestrator),
mapper: mapper,
upstreamClient: upstreamClient,
downstreamResponseMap: newDownstreamResponseMap(),
upstreamResponseMap: newUpstreamResponseMap(),
}
// Initialize cache.
cache, err := cache.NewCache(
int(cacheConfig.MaxEntries),
orchestrator.onCacheEvicted,
time.Duration(cacheConfig.Ttl.Nanos)*time.Nanosecond,
logger,
scope,
)
if err != nil {
orchestrator.logger.With("error", err).Panic(ctx, "failed to initialize cache")
}
orchestrator.cache = cache
go orchestrator.shutdown(ctx)
return orchestrator
}
// CreateWatch is managed by the underlying go-control-plane gRPC server.
//
// Orchestrator will populate the response channel with the corresponding
// responses once new resources are available upstream.
//
// If the channel is closed prior to cancellation of the watch, an
// unrecoverable error has occurred in the producer, and the consumer should
// close the corresponding stream.
//
// Cancel is an optional function to release resources in the producer. If
// provided, the consumer may call this function multiple times.
func (o *orchestrator) CreateWatch(req gcp.Request) (chan gcp.Response, func()) {
ctx := context.Background()
// If this is the first time we're seeing the request from the
// downstream client, initialize a channel to feed future responses.
responseChannel := o.downstreamResponseMap.createChannel(&req)
aggregatedKey, err := o.mapper.GetKey(req)
if err != nil {
// Can't map the request to an aggregated key. Log and continue to
// propagate the response upstream without aggregation.
o.logger.With("error", err, "request_type", req.GetTypeUrl()).Warn(ctx, "failed to map to aggregated key")
// Mimic the aggregated key.
// TODO (https://github.com/envoyproxy/xds-relay/issues/56). Can we
// condense this key but still make it granular enough to uniquely
// identify a request?
aggregatedKey = fmt.Sprintf("%s%s", unaggregatedPrefix, req.String())
}
o.logger.With(
"node_id", req.GetNode().GetId(),
"request_type", req.GetTypeUrl(),
"request_version", req.GetVersionInfo(),
"nonce", req.GetResponseNonce(),
"error", req.GetErrorDetail(),
"aggregated_key", aggregatedKey,
).Debug(ctx, "creating watch")
// Register the watch for future responses.
err = o.cache.AddRequest(aggregatedKey, &req)
if err != nil {
// If we fail to register the watch, we need to kill this stream by
// closing the response channel.
o.logger.With("error", err).With("aggregated_key", aggregatedKey).With(
"request_node", req.GetNode()).Error(ctx, "failed to add watch")
metrics.OrchestratorWatchErrorsSubscope(o.scope, aggregatedKey).Counter(metrics.ErrorRegisterWatch).Inc(1)
closedChannel := o.downstreamResponseMap.delete(&req)
return closedChannel, nil
}
metrics.OrchestratorWatchSubscope(o.scope, aggregatedKey).Counter(metrics.OrchestratorWatchCreated).Inc(1)
// Check if we have a cached response first.
cached, err := o.cache.Fetch(aggregatedKey)
if err != nil {
// Log, and continue to propagate the response upstream.
o.logger.With("error", err).With("aggregated_key", aggregatedKey).Warn(ctx, "failed to fetch aggregated key")
}
if cached != nil && cached.Resp != nil && cached.Resp.GetVersionInfo() != req.GetVersionInfo() {
// If we have a cached response and the version is different,
// immediately push the result to the response channel.
go func() {
err := responseChannel.addResponse(convertToGcpResponse(cached.Resp, req))
if err != nil {
// Sanity check that the channel isn't blocked. This shouldn't
// ever happen since the channel is newly created. Regardless,
// continue to create the watch.
o.logger.With("aggregated_key", aggregatedKey, "error", err).Warn(context.Background(),
"failed to push cached response")
metrics.OrchestratorWatchErrorsSubscope(o.scope, aggregatedKey).Counter(metrics.ErrorChannelFull).Inc(1)
} else {
metrics.OrchestratorWatchSubscope(o.scope, aggregatedKey).Counter(metrics.OrchestratorWatchFanouts).Inc(1)
}
}()
}
// Check if we have a upstream stream open for this aggregated key. If not,
// open a stream with the representative request.
if !o.upstreamResponseMap.exists(aggregatedKey) {
upstreamResponseChan, shutdown, err := o.upstreamClient.OpenStream(req)
if err != nil {
// TODO implement retry/back-off logic on error scenario.
// https://github.com/envoyproxy/xds-relay/issues/68
o.logger.With(
"error", err,
"aggregated_key", aggregatedKey,
).Error(ctx, "Failed to open stream to origin server")
} else {
respChannel, upstreamOpenedPreviously := o.upstreamResponseMap.add(aggregatedKey, upstreamResponseChan)
if upstreamOpenedPreviously {
// A stream was opened previously due to a race between
// concurrent downstreams for the same aggregated key, between
// exists and add operations. In this event, simply close the
// slower stream and return the existing one.
shutdown()
} else {
// Spin up a go routine to watch for upstream responses.
// One routine is opened per aggregate key.
o.logger.With("aggregated_key", aggregatedKey).Debug(ctx, "watching upstream")
go o.watchUpstream(ctx, aggregatedKey, respChannel.response, respChannel.done, shutdown)
}
}
}
return responseChannel.channel, o.onCancelWatch(aggregatedKey, &req)
}
// Fetch implements the polling method of the config cache using a non-empty request.
func (o *orchestrator) Fetch(context.Context, discovery.DiscoveryRequest) (gcp.Response, error) {
return nil, fmt.Errorf("Not implemented")
}
// GetReadOnlyCache returns the request/response cache with only read-only methods exposed.
func (o *orchestrator) GetReadOnlyCache() cache.ReadOnlyCache {
return o.cache.GetReadOnlyCache()
}
// GetDownstreamAggregatedKeys returns the aggregated keys for all requests stored in the downstream response map.
func (o *orchestrator) GetDownstreamAggregatedKeys() (map[string]bool, error) {
keys, err := o.downstreamResponseMap.getAggregatedKeys(&o.mapper)
if err != nil {
o.logger.With("error", err).Error(context.Background(), "Unable to get keys")
}
return keys, err
}
// watchUpstream is intended to be called in a go routine, to receive incoming
// responses, cache the response, and fan out to downstream clients or
// "watchers". There is a corresponding go routine for each aggregated key.
//
// This goroutine continually listens for upstream responses from the passed
// `responseChannel`. For each response, we will:
// - cache this latest response, replacing the previous stale response.
// - retrieve the downstream watchers from the cache for this `aggregated key`.
// - trigger the fanout process to downstream watchers by pushing to the
// individual downstream response channels in separate go routines.
//
// Additionally this function tracks a `done` channel and a `shutdownUpstream`
// function. `done` is a channel that gets closed in two places:
// 1. when server shutdown is triggered. See the `shutdown` function in this
// file for more information.
// 2. when cache TTL expires for this aggregated key. See the `onCacheEvicted`
// function in this file for more information.
// When the `done` channel is closed, we call the `shutdownUpstream` callback
// function. This will signify to the upstream client that we no longer require
// responses from this stream because the downstream connections have been
// terminated. The upstream client will clean up the stream accordingly.
func (o *orchestrator) watchUpstream(
ctx context.Context,
aggregatedKey string,
responseChannel <-chan *discovery.DiscoveryResponse,
done <-chan bool,
shutdownUpstream func(),
) {
for {
select {
case x, more := <-responseChannel:
if !more {
// A problem occurred fetching the response upstream, log retry.
// TODO implement retry/back-off logic on error scenario.
// https://github.com/envoyproxy/xds-relay/issues/68
o.logger.With("aggregated_key", aggregatedKey).Error(ctx, "upstream error")
metrics.OrchestratorWatchErrorsSubscope(o.scope, aggregatedKey).Counter(metrics.ErrorUpstreamFailure).Inc(1)
return
}
// Cache the response.
_, err := o.cache.SetResponse(aggregatedKey, *x)
if err != nil {
// TODO if set fails, we may need to retry upstream as well.
// Currently the fallback is to rely on a future response, but
// that probably isn't ideal.
// https://github.com/envoyproxy/xds-relay/issues/70s
//
// If we fail to cache the new response, log and return the old one.
o.logger.With("error", err).With("aggregated_key", aggregatedKey).
Error(ctx, "Failed to cache the response")
}
// Get downstream watchers and fan out.
// We retrieve from cache rather than directly fanning out the
// newly received response because the cache does additional
// resource serialization.
cached, err := o.cache.Fetch(aggregatedKey)
if err != nil {
o.logger.With("error", err).With("aggregated_key", aggregatedKey).Error(ctx, "cache fetch failed")
// Can't do anything because we don't know who the watchers
// are. Drop the response.
} else {
if cached == nil || cached.Resp == nil {
// If cache is empty, there is nothing to fan out.
// Error. Sanity check. Shouldn't ever reach this since we
// just set the response, but it's a rare scenario that can
// happen if the cache TTL is set very short.
o.logger.With("aggregated_key", aggregatedKey).Error(ctx, "attempted to fan out with no cached response")
metrics.OrchestratorWatchErrorsSubscope(o.scope, aggregatedKey).Counter(metrics.ErrorCacheMiss).Inc(1)
} else {
// Goldenpath.
o.logger.With(
"aggregated_key", aggregatedKey,
"response_type", cached.Resp.GetTypeUrl(),
"response_version", cached.Resp.GetVersionInfo(),
).Debug(ctx, "response fanout initiated")
o.fanout(cached.Resp, cached.Requests, aggregatedKey)
}
}
case <-done:
// Exit when signaled that the stream has closed.
o.logger.With("aggregated_key", aggregatedKey).Info(ctx, "shutting down upstream watch")
shutdownUpstream()
return
}
}
}
// fanout pushes the response to the response channels of all open downstream
// watchers in parallel.
func (o *orchestrator) fanout(resp *discovery.DiscoveryResponse, watchers map[*gcp.Request]bool, aggregatedKey string) {
var wg sync.WaitGroup
for watch := range watchers {
wg.Add(1)
go func(watch *gcp.Request) {
defer wg.Done()
if channel, ok := o.downstreamResponseMap.get(watch); ok {
if err := channel.addResponse(convertToGcpResponse(resp, *watch)); err != nil {
// If the channel is blocked, we simply drop subsequent requests and error.
// Alternative possibilities are discussed here:
// https://github.com/envoyproxy/xds-relay/pull/53#discussion_r420325553
o.logger.With("aggregated_key", aggregatedKey).With("node_id", watch.GetNode().GetId()).
Error(context.Background(), "channel blocked during fanout")
metrics.OrchestratorWatchErrorsSubscope(o.scope, aggregatedKey).Counter(metrics.ErrorChannelFull).Inc(1)
return
}
o.logger.With(
"aggregated_key", aggregatedKey,
"node_id", watch.GetNode().GetId(),
"response_version", resp.GetVersionInfo(),
"response_type", resp.GetTypeUrl(),
).Debug(context.Background(), "response sent")
metrics.OrchestratorWatchSubscope(o.scope, aggregatedKey).Counter(metrics.OrchestratorWatchFanouts).Inc(1)
}
}(watch)
}
// Wait for all fanouts to complete.
wg.Wait()
}
// onCacheEvicted is called when the cache evicts a response due to TTL or
// other reasons. When this happens, we need to clean up open streams.
// We shut down both the downstream watchers and the upstream stream.
func (o *orchestrator) onCacheEvicted(key string, resource cache.Resource) {
// TODO Potential for improvements here to handle the thundering herd
// problem: https://github.com/envoyproxy/xds-relay/issues/71
o.downstreamResponseMap.deleteAll(resource.Requests)
o.upstreamResponseMap.delete(key)
}
// onCancelWatch cleans up the cached watch when called.
func (o *orchestrator) onCancelWatch(aggregatedKey string, req *gcp.Request) func() {
return func() {
o.downstreamResponseMap.delete(req)
metrics.OrchestratorWatchSubscope(o.scope, aggregatedKey).Counter(metrics.OrchestratorWatchCanceled).Inc(1)
if err := o.cache.DeleteRequest(aggregatedKey, req); err != nil {
o.logger.With(
"aggregated_key", aggregatedKey,
"error", err,
).Warn(context.Background(), "Failed to delete from cache")
}
}
}
// shutdown closes all upstream connections when ctx.Done is called.
func (o *orchestrator) shutdown(ctx context.Context) {
<-ctx.Done()
o.upstreamResponseMap.deleteAll()
}
// convertToGcpResponse constructs the go-control-plane response from the
// cached response.
func convertToGcpResponse(resp *discovery.DiscoveryResponse, req gcp.Request) gcp.PassthroughResponse {
return gcp.PassthroughResponse{
Request: req,
DiscoveryResponse: resp,
}
}