-
Notifications
You must be signed in to change notification settings - Fork 40
/
Copy pathleader.go
341 lines (299 loc) · 8.96 KB
/
leader.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package main
import (
"context"
"encoding/json"
"reflect"
"sort"
"time"
"github.com/hashicorp/consul/api"
)
type NodeWatchList struct {
Nodes []string
Probes []string
}
func (a *Agent) runLeaderLoop() {
// Arrange to give up any held lock any time we exit the goroutine so
// another agent can pick up without delay.
var lock *api.Lock
defer func() {
if lock != nil {
lock.Unlock()
}
}()
LEADER_WAIT:
select {
case <-a.shutdownCh:
return
default:
}
// Wait to get the leader lock before running snapshots.
a.logger.Info("Trying to obtain leadership...")
if lock == nil {
var err error
lock, err = a.client.LockKey(a.config.KVPath + LeaderKey)
if err != nil {
a.logger.Error("Error trying to create leader lock (will retry)", "error", err)
time.Sleep(retryTime)
goto LEADER_WAIT
}
}
leaderCh, err := lock.Lock(a.shutdownCh)
if err != nil {
if err == api.ErrLockHeld {
a.logger.Error("Unable to use leader lock that was held previously and presumed lost, giving up the lock (will retry)", "error", err)
lock.Unlock()
time.Sleep(retryTime)
goto LEADER_WAIT
} else {
a.logger.Error("Error trying to get leader lock (will retry)", "error", err)
time.Sleep(retryTime)
goto LEADER_WAIT
}
}
if leaderCh == nil {
// This is how the Lock() call lets us know that it quit because
// we closed the shutdown channel.
return
}
a.logger.Info("Obtained leadership")
// Start a goroutine for computing the node watches.
go a.computeWatchedNodes(leaderCh)
for {
select {
case <-leaderCh:
a.logger.Warn("Lost leadership")
goto LEADER_WAIT
case <-a.shutdownCh:
return
}
}
}
// nodesLists builds lists of nodes each agent is responsible for.
func nodeLists(nodes []*api.Node, insts []*api.ServiceEntry,
) (map[string][]string, map[string][]string) {
healthNodes := make(map[string][]string)
pingNodes := make(map[string][]string)
if len(insts) == 0 {
return healthNodes, pingNodes
}
for i, node := range nodes {
idx := i % len(insts)
agentID := insts[idx].Service.ID
// If it's a node to probe, add it to the ping list. Otherwise just add
// it to the list of nodes to be health checked.
if node.Meta != nil {
if v, ok := node.Meta["external-probe"]; ok && v == "true" {
pingNodes[agentID] = append(pingNodes[agentID], node.Node)
continue
}
}
healthNodes[agentID] = append(healthNodes[agentID], node.Node)
}
return healthNodes, pingNodes
}
func (a *Agent) commitOps(ops api.KVTxnOps) bool {
success, results, _, err := a.client.KV().Txn(ops, a.ConsulQueryOption())
if err != nil || !success {
a.logger.Error("Error writing state to KV store", "results", results, "error", err)
// Try again after the wait because we got an error.
return false
}
return true
}
// computeWatchedNodes watches both the list of registered ESM instances and
// the list of external nodes registered in Consul and decides which nodes each
// ESM instance should be in charge of, writing the output to the KV store.
func (a *Agent) computeWatchedNodes(stopCh <-chan struct{}) {
nodeCh := make(chan []*api.Node)
instanceCh := make(chan []*api.ServiceEntry)
go a.watchExternalNodes(nodeCh, stopCh)
go a.watchServiceInstances(instanceCh, stopCh)
externalNodes := <-nodeCh
healthyInstances := <-instanceCh
var prevHealthNodes map[string][]string
var prevPingNodes map[string][]string
// Avoid blocking on first pass
retryTimer := time.After(0)
WATCH_NODES_WAIT:
for {
select {
case <-stopCh:
return
case externalNodes = <-nodeCh:
case healthyInstances = <-instanceCh:
case <-retryTimer:
}
// Next time through block until either nodes or instances are updated
retryTimer = nil
// Wait for some instances to become available, if there are none, then there isn't anything we can do
if len(healthyInstances) == 0 {
retryTimer = time.After(retryTime)
continue
}
healthNodes, pingNodes := nodeLists(externalNodes, healthyInstances)
// Write the KV update as a transaction.
kvOps := &api.KVTxnOp{
Verb: api.KVDeleteTree,
Key: a.kvNodeListPath(),
}
a.HasPartition(func(partition string) {
kvOps.Partition = partition
})
ops := api.KVTxnOps{
kvOps,
}
for _, agent := range healthyInstances {
bytes, _ := json.Marshal(NodeWatchList{
Nodes: healthNodes[agent.Service.ID],
Probes: pingNodes[agent.Service.ID],
})
op := &api.KVTxnOp{
Verb: api.KVSet,
Key: a.kvNodeListPath() + agent.Service.ID,
Value: bytes,
}
a.HasPartition(func(partition string) {
op.Partition = partition
})
ops = append(ops, op)
// Flush any ops if we're nearing a transaction limit
if len(ops) >= maximumTransactionSize {
if !a.commitOps(ops) {
retryTimer = time.After(retryTime)
goto WATCH_NODES_WAIT
}
ops = api.KVTxnOps{}
}
}
// Final flush for ops
if !a.commitOps(ops) {
retryTimer = time.After(retryTime)
continue
}
// Log a message when the balancing changes.
if !reflect.DeepEqual(healthNodes, prevHealthNodes) || !reflect.DeepEqual(pingNodes, prevPingNodes) {
a.logger.Info("Rebalanced external nodes across ESM instances", "nodes", len(externalNodes), "instances", len(healthyInstances))
prevHealthNodes = healthNodes
prevPingNodes = pingNodes
}
}
}
// watchExternalNodes does a watch for external nodes and returns any updates
// back through nodeCh as a sorted list.
func (a *Agent) watchExternalNodes(nodeCh chan []*api.Node, stopCh <-chan struct{}) {
opts := &api.QueryOptions{
NodeMeta: a.config.NodeMeta,
}
a.HasPartition(func(partition string) {
opts.Partition = partition
})
ctx, cancelFunc := context.WithCancel(context.Background())
opts = opts.WithContext(ctx)
go func() {
<-stopCh
cancelFunc()
}()
firstRun := true
for {
if !firstRun {
select {
case <-stopCh:
return
case <-time.After(retryTime):
// Sleep here to limit how much load we put on the Consul servers.
}
}
firstRun = false
// Do a blocking query for any external node changes
externalNodes, meta, err := a.client.Catalog().Nodes(opts)
if err != nil {
a.logger.Warn("Error getting external node list", "error", err)
continue
}
sort.Slice(externalNodes, func(a, b int) bool {
return externalNodes[a].Node < externalNodes[b].Node
})
opts.WaitIndex = meta.LastIndex
a.logger.Info("Updating external node list", "items", len(externalNodes))
nodeCh <- externalNodes
}
}
// watchServiceInstances does a watch for any ESM instances with the same service tag as
// this agent and sends any updates back through instanceCh as a sorted list.
func (a *Agent) watchServiceInstances(instanceCh chan []*api.ServiceEntry, stopCh <-chan struct{}) {
var opts *api.QueryOptions
ctx, cancelFunc := context.WithCancel(context.Background())
opts = opts.WithContext(ctx)
a.HasPartition(func(partition string) {
opts.Partition = partition
})
go func() {
<-stopCh
cancelFunc()
}()
for {
select {
case <-stopCh:
return
case <-time.After(retryTime / 10):
// Sleep here to limit how much load we put on the Consul servers.
// We can wait a lot less than the normal retry time here because
// the ESM service instance list is relatively small and cheap to
// query.
}
switch healthyInstances, err := a.getServiceInstances(opts); err {
case nil:
instanceCh <- healthyInstances
default:
a.logger.Warn("[WARN] Error querying for health check info",
"error", err)
continue // not needed, but nice to be explicit
}
}
}
// getServiceInstances retuns a list of services with a 'passing' (healthy) state.
// It loops over all available namespaces to get instances from each.
func (a *Agent) getServiceInstances(opts *api.QueryOptions) ([]*api.ServiceEntry, error) {
var healthyInstances []*api.ServiceEntry
var meta *api.QueryMeta
namespaces, err := namespacesList(a.client, a.config)
if err != nil {
return nil, err
}
for _, ns := range namespaces {
if ns.Name != "" {
a.logger.Info("checking namespaces for services", "name", ns.Name)
}
opts.Namespace = ns.Name
healthy, m, err := a.client.Health().Service(a.config.Service,
a.config.Tag, true, opts)
if err != nil {
return nil, err
}
meta = m // keep last good meta
for _, h := range healthy {
healthyInstances = append(healthyInstances, h)
}
}
opts.WaitIndex = meta.LastIndex
sort.Slice(healthyInstances, func(a, b int) bool {
return healthyInstances[a].Service.ID < healthyInstances[b].Service.ID
})
return healthyInstances, nil
}
// namespacesList returns a list of all accessable namespaces.
// Returns namespace "" (none) if none found for consul OSS compatibility.
func namespacesList(client *api.Client, config *Config) ([]*api.Namespace, error) {
opts := &api.QueryOptions{
Partition: config.Partition,
}
namespaces, _, err := client.Namespaces().List(opts)
if e, ok := err.(api.StatusError); ok && e.Code == 404 {
namespaces = []*api.Namespace{{Name: ""}}
} else if err != nil {
return nil, err
}
return namespaces, nil
}