-
Notifications
You must be signed in to change notification settings - Fork 2k
/
Copy pathplugin_supervisor_hook.go
373 lines (320 loc) · 12.3 KB
/
plugin_supervisor_hook.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
package taskrunner
import (
"context"
"fmt"
"os"
"path/filepath"
"sync"
"time"
hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/client/allocrunner/interfaces"
ti "github.com/hashicorp/nomad/client/allocrunner/taskrunner/interfaces"
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/csi"
"github.com/hashicorp/nomad/plugins/drivers"
)
// csiPluginSupervisorHook manages supervising plugins that are running as Nomad
// tasks. These plugins will be fingerprinted and it will manage connecting them
// to their requisite plugin manager.
//
// It provides a couple of things to a task running inside Nomad. These are:
// * A mount to the `plugin_mount_dir`, that will then be used by Nomad
// to connect to the nested plugin and handle volume mounts.
// * When the task has started, it starts a loop of attempting to connect to the
// plugin, to perform initial fingerprinting of the plugins capabilities before
// notifying the plugin manager of the plugin.
type csiPluginSupervisorHook struct {
logger hclog.Logger
alloc *structs.Allocation
task *structs.Task
runner *TaskRunner
mountPoint string
// eventEmitter is used to emit events to the task
eventEmitter ti.EventEmitter
shutdownCtx context.Context
shutdownCancelFn context.CancelFunc
running bool
runningLock sync.Mutex
// previousHealthstate is used by the supervisor goroutine to track historic
// health states for gating task events.
previousHealthState bool
}
// The plugin supervisor uses the PrestartHook mechanism to setup the requisite
// mount points and configuration for the task that exposes a CSI plugin.
var _ interfaces.TaskPrestartHook = &csiPluginSupervisorHook{}
// The plugin supervisor uses the PoststartHook mechanism to start polling the
// plugin for readiness and supported functionality before registering the
// plugin with the catalog.
var _ interfaces.TaskPoststartHook = &csiPluginSupervisorHook{}
// The plugin supervisor uses the StopHook mechanism to deregister the plugin
// with the catalog and to ensure any mounts are cleaned up.
var _ interfaces.TaskStopHook = &csiPluginSupervisorHook{}
func newCSIPluginSupervisorHook(csiRootDir string, eventEmitter ti.EventEmitter, runner *TaskRunner, logger hclog.Logger) *csiPluginSupervisorHook {
task := runner.Task()
// The Plugin directory will look something like this:
// .
// ..
// csi.sock - A unix domain socket used to communicate with the CSI Plugin
// staging/
// {volume-id}/{usage-mode-hash}/ - Intermediary mount point that will be used by plugins that support NODE_STAGE_UNSTAGE capabilities.
// per-alloc/
// {alloc-id}/{volume-id}/{usage-mode-hash}/ - Mount Point that will be bind-mounted into tasks that utilise the volume
pluginRoot := filepath.Join(csiRootDir, string(task.CSIPluginConfig.Type), task.CSIPluginConfig.ID)
shutdownCtx, cancelFn := context.WithCancel(context.Background())
hook := &csiPluginSupervisorHook{
alloc: runner.Alloc(),
runner: runner,
logger: logger,
task: task,
mountPoint: pluginRoot,
shutdownCtx: shutdownCtx,
shutdownCancelFn: cancelFn,
eventEmitter: eventEmitter,
}
return hook
}
func (*csiPluginSupervisorHook) Name() string {
return "csi_plugin_supervisor"
}
// Prestart is called before the task is started including after every
// restart. This requires that the mount paths for a plugin be idempotent,
// despite us not knowing the name of the plugin ahead of time.
// Because of this, we use the allocid_taskname as the unique identifier for a
// plugin on the filesystem.
func (h *csiPluginSupervisorHook) Prestart(ctx context.Context,
req *interfaces.TaskPrestartRequest, resp *interfaces.TaskPrestartResponse) error {
// Create the mount directory that the container will access if it doesn't
// already exist. Default to only nomad user access.
if err := os.MkdirAll(h.mountPoint, 0700); err != nil && !os.IsExist(err) {
return fmt.Errorf("failed to create mount point: %v", err)
}
configMount := &drivers.MountConfig{
TaskPath: h.task.CSIPluginConfig.MountDir,
HostPath: h.mountPoint,
Readonly: false,
PropagationMode: "bidirectional",
}
devMount := &drivers.MountConfig{
TaskPath: "/dev",
HostPath: "/dev",
Readonly: false,
}
mounts := ensureMountpointInserted(h.runner.hookResources.getMounts(), configMount)
mounts = ensureMountpointInserted(mounts, devMount)
h.runner.hookResources.setMounts(mounts)
resp.Done = true
return nil
}
// Poststart is called after the task has started. Poststart is not
// called if the allocation is terminal.
//
// The context is cancelled if the task is killed.
func (h *csiPluginSupervisorHook) Poststart(_ context.Context, _ *interfaces.TaskPoststartRequest, _ *interfaces.TaskPoststartResponse) error {
// If we're already running the supervisor routine, then we don't need to try
// and restart it here as it only terminates on `Stop` hooks.
h.runningLock.Lock()
if h.running {
h.runningLock.Unlock()
return nil
}
h.runningLock.Unlock()
go h.ensureSupervisorLoop(h.shutdownCtx)
return nil
}
// ensureSupervisorLoop should be called in a goroutine. It will terminate when
// the passed in context is terminated.
//
// The supervisor works by:
// - Initially waiting for the plugin to become available. This loop is expensive
// and may do things like create new gRPC Clients on every iteration.
// - After receiving an initial healthy status, it will inform the plugin catalog
// of the plugin, registering it with the plugins fingerprinted capabilities.
// - We then perform a more lightweight check, simply probing the plugin on a less
// frequent interval to ensure it is still alive, emitting task events when this
// status changes.
//
// Deeper fingerprinting of the plugin is implemented by the csimanager.
func (h *csiPluginSupervisorHook) ensureSupervisorLoop(ctx context.Context) {
h.runningLock.Lock()
if h.running == true {
h.runningLock.Unlock()
return
}
h.running = true
h.runningLock.Unlock()
defer func() {
h.runningLock.Lock()
h.running = false
h.runningLock.Unlock()
}()
socketPath := filepath.Join(h.mountPoint, structs.CSISocketName)
t := time.NewTimer(0)
// Step 1: Wait for the plugin to initially become available.
WAITFORREADY:
for {
select {
case <-ctx.Done():
return
case <-t.C:
pluginHealthy, err := h.supervisorLoopOnce(ctx, socketPath)
if err != nil || !pluginHealthy {
h.logger.Debug("CSI Plugin not ready", "error", err)
// Plugin is not yet returning healthy, because we want to optimise for
// quickly bringing a plugin online, we use a short timeout here.
// TODO(dani): Test with more plugins and adjust.
t.Reset(5 * time.Second)
continue
}
// Mark the plugin as healthy in a task event
h.previousHealthState = pluginHealthy
event := structs.NewTaskEvent(structs.TaskPluginHealthy)
event.SetMessage(fmt.Sprintf("plugin: %s", h.task.CSIPluginConfig.ID))
h.eventEmitter.EmitEvent(event)
break WAITFORREADY
}
}
// Step 2: Register the plugin with the catalog.
deregisterPluginFn, err := h.registerPlugin(socketPath)
if err != nil {
h.logger.Error("CSI Plugin registration failed", "error", err)
event := structs.NewTaskEvent(structs.TaskPluginUnhealthy)
event.SetMessage(fmt.Sprintf("failed to register plugin: %s, reason: %v", h.task.CSIPluginConfig.ID, err))
h.eventEmitter.EmitEvent(event)
}
// Step 3: Start the lightweight supervisor loop.
t.Reset(0)
for {
select {
case <-ctx.Done():
// De-register plugins on task shutdown
deregisterPluginFn()
return
case <-t.C:
pluginHealthy, err := h.supervisorLoopOnce(ctx, socketPath)
if err != nil {
h.logger.Error("CSI Plugin fingerprinting failed", "error", err)
}
// The plugin has transitioned to a healthy state. Emit an event.
if !h.previousHealthState && pluginHealthy {
event := structs.NewTaskEvent(structs.TaskPluginHealthy)
event.SetMessage(fmt.Sprintf("plugin: %s", h.task.CSIPluginConfig.ID))
h.eventEmitter.EmitEvent(event)
}
// The plugin has transitioned to an unhealthy state. Emit an event.
if h.previousHealthState && !pluginHealthy {
event := structs.NewTaskEvent(structs.TaskPluginUnhealthy)
if err != nil {
event.SetMessage(fmt.Sprintf("error: %v", err))
} else {
event.SetMessage("Unknown Reason")
}
h.eventEmitter.EmitEvent(event)
}
h.previousHealthState = pluginHealthy
// This loop is informational and in some plugins this may be expensive to
// validate. We use a longer timeout (30s) to avoid causing undue work.
t.Reset(30 * time.Second)
}
}
}
func (h *csiPluginSupervisorHook) registerPlugin(socketPath string) (func(), error) {
// At this point we know the plugin is ready and we can fingerprint it
// to get its vendor name and version
client, err := csi.NewClient(socketPath, h.logger.Named("csi_client").With("plugin.name", h.task.CSIPluginConfig.ID, "plugin.type", h.task.CSIPluginConfig.Type))
defer client.Close()
if err != nil {
return nil, fmt.Errorf("failed to create csi client: %v", err)
}
info, err := client.PluginInfo()
if err != nil {
return nil, fmt.Errorf("failed to probe plugin: %v", err)
}
mkInfoFn := func(pluginType string) *dynamicplugins.PluginInfo {
return &dynamicplugins.PluginInfo{
Type: pluginType,
Name: h.task.CSIPluginConfig.ID,
Version: info.PluginVersion,
ConnectionInfo: &dynamicplugins.PluginConnectionInfo{
SocketPath: socketPath,
},
AllocID: h.alloc.ID,
Options: map[string]string{
"Provider": info.Name, // vendor name
"MountPoint": h.mountPoint,
"ContainerMountPoint": h.task.CSIPluginConfig.MountDir,
},
}
}
registrations := []*dynamicplugins.PluginInfo{}
switch h.task.CSIPluginConfig.Type {
case structs.CSIPluginTypeController:
registrations = append(registrations, mkInfoFn(dynamicplugins.PluginTypeCSIController))
case structs.CSIPluginTypeNode:
registrations = append(registrations, mkInfoFn(dynamicplugins.PluginTypeCSINode))
case structs.CSIPluginTypeMonolith:
registrations = append(registrations, mkInfoFn(dynamicplugins.PluginTypeCSIController))
registrations = append(registrations, mkInfoFn(dynamicplugins.PluginTypeCSINode))
}
deregistrationFns := []func(){}
for _, reg := range registrations {
if err := h.runner.dynamicRegistry.RegisterPlugin(reg); err != nil {
for _, fn := range deregistrationFns {
fn()
}
return nil, err
}
// need to rebind these so that each deregistration function
// closes over its own registration
rname := reg.Name
rtype := reg.Type
deregistrationFns = append(deregistrationFns, func() {
err := h.runner.dynamicRegistry.DeregisterPlugin(rtype, rname)
if err != nil {
h.logger.Error("failed to deregister csi plugin", "name", rname, "type", rtype, "error", err)
}
})
}
return func() {
for _, fn := range deregistrationFns {
fn()
}
}, nil
}
func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, socketPath string) (bool, error) {
_, err := os.Stat(socketPath)
if err != nil {
return false, fmt.Errorf("failed to stat socket: %v", err)
}
client, err := csi.NewClient(socketPath, h.logger.Named("csi_client").With("plugin.name", h.task.CSIPluginConfig.ID, "plugin.type", h.task.CSIPluginConfig.Type))
defer client.Close()
if err != nil {
return false, fmt.Errorf("failed to create csi client: %v", err)
}
healthy, err := client.PluginProbe(ctx)
if err != nil {
return false, fmt.Errorf("failed to probe plugin: %v", err)
}
return healthy, nil
}
// Stop is called after the task has exited and will not be started
// again. It is the only hook guaranteed to be executed whenever
// TaskRunner.Run is called (and not gracefully shutting down).
// Therefore it may be called even when prestart and the other hooks
// have not.
//
// Stop hooks must be idempotent. The context is cancelled prematurely if the
// task is killed.
func (h *csiPluginSupervisorHook) Stop(_ context.Context, req *interfaces.TaskStopRequest, _ *interfaces.TaskStopResponse) error {
h.shutdownCancelFn()
return nil
}
func ensureMountpointInserted(mounts []*drivers.MountConfig, mount *drivers.MountConfig) []*drivers.MountConfig {
for _, mnt := range mounts {
if mnt.IsEqual(mount) {
return mounts
}
}
mounts = append(mounts, mount)
return mounts
}