-
Notifications
You must be signed in to change notification settings - Fork 84
/
Copy pathplugin.go
239 lines (193 loc) · 7.13 KB
/
plugin.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package nomad
import (
"fmt"
"strings"
"sync"
"time"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad-autoscaler/plugins"
"github.com/hashicorp/nomad-autoscaler/plugins/base"
"github.com/hashicorp/nomad-autoscaler/plugins/target"
"github.com/hashicorp/nomad-autoscaler/sdk"
nomadHelper "github.com/hashicorp/nomad-autoscaler/sdk/helper/nomad"
"github.com/hashicorp/nomad/api"
)
const (
// pluginName is the unique name of the this plugin amongst target
// plugins.
pluginName = "nomad-target"
// configKeys are the accepted configuration map keys which can be
// processed when performing SetConfig().
configKeyJobID = "Job"
configKeyGroup = "Group"
configKeyNamespace = "Namespace"
// garbageCollectionNanoSecondThreshold is the nanosecond threshold used
// when performing garbage collection of job status handlers.
garbageCollectionNanoSecondThreshold = 14400000000000
// garbageCollectionSecondInterval is the interval in seconds at which the
// garbage collector will run.
garbageCollectionSecondInterval = 60
)
var (
PluginID = plugins.PluginID{
Name: pluginName,
PluginType: sdk.PluginTypeTarget,
}
PluginConfig = &plugins.InternalPluginConfig{
Factory: func(l hclog.Logger) interface{} { return NewNomadPlugin(l) },
}
pluginInfo = &base.PluginInfo{
Name: pluginName,
PluginType: sdk.PluginTypeTarget,
}
)
// Assert that TargetPlugin meets the target.Target interface.
var _ target.Target = (*TargetPlugin)(nil)
// TargetPlugin is the Nomad implementation of the target.Target interface.
type TargetPlugin struct {
client *api.Client
logger hclog.Logger
// statusHandlers is a mapping of jobScaleStatusHandlers keyed by the
// namespacedJobID that the handler represents. The lock should be used
// when accessing the map.
statusHandlers map[namespacedJobID]*jobScaleStatusHandler
statusHandlersLock sync.RWMutex
// gcRunning indicates whether the GC loop is running or not.
gcRunning bool
gcRunningLock sync.RWMutex
}
// namespacedJobID encapsulates the namespace and jobID, which together make a
// unique job reference within a Nomad region.
type namespacedJobID struct {
namespace, job string
}
// NewNomadPlugin returns the Nomad implementation of the target.Target
// interface.
func NewNomadPlugin(log hclog.Logger) *TargetPlugin {
return &TargetPlugin{
logger: log,
statusHandlers: make(map[namespacedJobID]*jobScaleStatusHandler),
}
}
// SetConfig satisfies the SetConfig function on the base.Base interface.
func (t *TargetPlugin) SetConfig(config map[string]string) error {
t.gcRunningLock.RLock()
defer t.gcRunningLock.RUnlock()
if !t.gcRunning {
go t.garbageCollectionLoop()
}
cfg := nomadHelper.ConfigFromNamespacedMap(config)
client, err := api.NewClient(cfg)
if err != nil {
return fmt.Errorf("failed to instantiate Nomad client: %v", err)
}
t.client = client
// Create a read/write lock on the handlers so we can safely interact.
t.statusHandlersLock.Lock()
defer t.statusHandlersLock.Unlock()
// Reload nomad client on existing handlers
for _, sh := range t.statusHandlers {
sh.client = client
}
return nil
}
// PluginInfo satisfies the PluginInfo function on the base.Base interface.
func (t *TargetPlugin) PluginInfo() (*base.PluginInfo, error) {
return pluginInfo, nil
}
// Scale satisfies the Scale function on the target.Target interface.
func (t *TargetPlugin) Scale(action sdk.ScalingAction, config map[string]string) error {
var countIntPtr *int
if action.Count != sdk.StrategyActionMetaValueDryRunCount {
countInt := int(action.Count)
countIntPtr = &countInt
}
// Setup the Nomad write options.
q := api.WriteOptions{}
// If namespace is included within the config, add this to write opts. If
// this is omitted, we fallback to Nomad standard practice.
if namespace, ok := config[configKeyNamespace]; ok {
q.Namespace = namespace
}
_, _, err := t.client.Jobs().Scale(config[configKeyJobID],
config[configKeyGroup],
countIntPtr,
action.Reason,
action.Error,
action.Meta,
&q)
if err != nil {
// Active deployments errors are fairly common and usually not
// impactful to the target's eventual end state, so special case them
// to return a no-op error instead.
if strings.Contains(err.Error(), "job scaling blocked due to active deployment") {
return sdk.NewTargetScalingNoOpError("skipping scaling group %s/%s due to active deployment", config[configKeyJobID], config[configKeyGroup])
}
return fmt.Errorf("failed to scale group %s/%s: %v", config[configKeyJobID], config[configKeyGroup], err)
}
return nil
}
// Status satisfies the Status function on the target.Target interface.
func (t *TargetPlugin) Status(config map[string]string) (*sdk.TargetStatus, error) {
// Get the JobID from the config map. This is a required param and results
// in an error if not found or is an empty string.
jobID, ok := config[configKeyJobID]
if !ok || jobID == "" {
return nil, fmt.Errorf("required config key %q not found", configKeyJobID)
}
// Get the GroupName from the config map. This is a required param and
// results in an error if not found or is an empty string.
group, ok := config[configKeyGroup]
if !ok || group == "" {
return nil, fmt.Errorf("required config key %q not found", configKeyGroup)
}
// Attempt to find the namespace config parameter. If this is not included
// use the Nomad default namespace "default".
namespace, ok := config[configKeyNamespace]
if !ok || namespace == "" {
namespace = "default"
}
nsID := namespacedJobID{namespace: namespace, job: jobID}
// Create a read/write lock on the handlers so we can safely interact.
t.statusHandlersLock.Lock()
defer t.statusHandlersLock.Unlock()
// Create a handler for the job if one does not currently exist,
// or if an existing one has stopped running but is not yet GC'd.
if h, ok := t.statusHandlers[nsID]; !ok || !h.running() {
jsh, err := newJobScaleStatusHandler(t.client, namespace, jobID, t.logger)
if err != nil {
return nil, err
}
t.statusHandlers[nsID] = jsh
}
return t.statusHandlers[nsID].status(group)
}
// garbageCollectionLoop runs a long lived loop, triggering the garbage
// collector at a specified interval.
func (t *TargetPlugin) garbageCollectionLoop() {
// Setup the ticker and set that the loop is now running.
ticker := time.NewTicker(garbageCollectionSecondInterval * time.Second)
t.gcRunningLock.Lock()
t.gcRunning = true
t.gcRunningLock.Unlock()
for range ticker.C {
t.logger.Debug("triggering run of handler garbage collection")
t.garbageCollect()
}
}
// garbageCollect runs a single round of status handler garbage collection.
func (t *TargetPlugin) garbageCollect() {
// Generate the GC threshold based on the current time.
threshold := time.Now().UTC().UnixNano() - garbageCollectionNanoSecondThreshold
// Iterate all the handlers, ensuring we lock for safety.
t.statusHandlersLock.Lock()
defer t.statusHandlersLock.Unlock()
for jobID, handle := range t.statusHandlers {
if handle.shouldGC(threshold) {
delete(t.statusHandlers, jobID)
t.logger.Debug("removed inactive job status handler", "job_id", jobID)
}
}
}