-
Notifications
You must be signed in to change notification settings - Fork 2k
/
Copy pathcsi_hook.go
555 lines (468 loc) · 15.6 KB
/
csi_hook.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
// Copyright (c) HashiCorp, Inc.
// SPDX-License-Identifier: BUSL-1.1
package allocrunner
import (
"context"
"fmt"
"strings"
"sync"
"time"
hclog "github.com/hashicorp/go-hclog"
multierror "github.com/hashicorp/go-multierror"
"github.com/hashicorp/nomad/client/allocrunner/state"
"github.com/hashicorp/nomad/client/config"
"github.com/hashicorp/nomad/client/dynamicplugins"
"github.com/hashicorp/nomad/client/pluginmanager/csimanager"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/plugins/drivers"
)
// csiHook will wait for remote csi volumes to be attached to the host before
// continuing.
//
// It is a noop for allocs that do not depend on CSI Volumes.
type csiHook struct {
alloc *structs.Allocation
logger hclog.Logger
csimanager csimanager.Manager
// interfaces implemented by the allocRunner
rpcClient config.RPCer
allocRunnerShim allocRunnerShim
hookResources *cstructs.AllocHookResources
nodeSecret string
minBackoffInterval time.Duration
maxBackoffInterval time.Duration
maxBackoffDuration time.Duration
volumeResultsLock sync.Mutex
volumeResults map[string]*volumePublishResult // alias -> volumePublishResult
shutdownCtx context.Context
shutdownCancelFn context.CancelFunc
}
// implemented by allocrunner
type allocRunnerShim interface {
GetTaskDriverCapabilities(string) (*drivers.Capabilities, error)
SetCSIVolumes(vols map[string]*state.CSIVolumeStub) error
GetCSIVolumes() (map[string]*state.CSIVolumeStub, error)
}
func newCSIHook(alloc *structs.Allocation, logger hclog.Logger, csi csimanager.Manager, rpcClient config.RPCer, arShim allocRunnerShim, hookResources *cstructs.AllocHookResources, nodeSecret string) *csiHook {
shutdownCtx, shutdownCancelFn := context.WithCancel(context.Background())
return &csiHook{
alloc: alloc,
logger: logger.Named("csi_hook"),
csimanager: csi,
rpcClient: rpcClient,
allocRunnerShim: arShim,
hookResources: hookResources,
nodeSecret: nodeSecret,
volumeResults: map[string]*volumePublishResult{},
minBackoffInterval: time.Second,
maxBackoffInterval: time.Minute,
maxBackoffDuration: time.Hour * 24,
shutdownCtx: shutdownCtx,
shutdownCancelFn: shutdownCancelFn,
}
}
func (c *csiHook) Name() string {
return "csi_hook"
}
func (c *csiHook) Prerun() error {
if !c.shouldRun() {
return nil
}
tg := c.alloc.Job.LookupTaskGroup(c.alloc.TaskGroup)
if err := c.validateTasksSupportCSI(tg); err != nil {
return err
}
// Because operations on CSI volumes are expensive and can error, we do each
// step for all volumes before proceeding to the next step so we have to
// unwind less work. In practice, most allocations with volumes will only
// have one or a few at most. We lock the results so that if an update/stop
// comes in while we're running we can assert we'll safely tear down
// everything that's been done so far.
c.volumeResultsLock.Lock()
defer c.volumeResultsLock.Unlock()
// Initially, populate the result map with all of the requests
for alias, volumeRequest := range tg.Volumes {
if volumeRequest.Type == structs.VolumeTypeCSI {
c.volumeResults[alias] = &volumePublishResult{
request: volumeRequest,
stub: &state.CSIVolumeStub{
VolumeID: volumeRequest.VolumeID(c.alloc.Name)},
}
}
}
err := c.restoreMounts(c.volumeResults)
if err != nil {
return fmt.Errorf("restoring mounts: %w", err)
}
err = c.claimVolumes(c.volumeResults)
if err != nil {
return fmt.Errorf("claiming volumes: %w", err)
}
err = c.mountVolumes(c.volumeResults)
if err != nil {
return fmt.Errorf("mounting volumes: %w", err)
}
// make the mounts available to the taskrunner's volume_hook
mounts := helper.ConvertMap(c.volumeResults,
func(result *volumePublishResult) *csimanager.MountInfo {
return result.stub.MountInfo
})
c.hookResources.SetCSIMounts(mounts)
// persist the published mount info so we can restore on client restarts
stubs := helper.ConvertMap(c.volumeResults,
func(result *volumePublishResult) *state.CSIVolumeStub {
return result.stub
})
c.allocRunnerShim.SetCSIVolumes(stubs)
return nil
}
// Postrun sends an RPC to the server to unpublish the volume. This may
// forward client RPCs to the node plugins or to the controller plugins,
// depending on whether other allocations on this node have claims on this
// volume.
func (c *csiHook) Postrun() error {
if !c.shouldRun() {
return nil
}
c.volumeResultsLock.Lock()
defer c.volumeResultsLock.Unlock()
var wg sync.WaitGroup
errs := make(chan error, len(c.volumeResults))
for _, result := range c.volumeResults {
wg.Add(1)
// CSI RPCs can potentially take a long time. Split the work
// into goroutines so that operators could potentially reuse
// one of a set of volumes
go func(result *volumePublishResult) {
defer wg.Done()
err := c.unmountImpl(result)
if err != nil {
// we can recover an unmount failure if the operator
// brings the plugin back up, so retry every few minutes
// but eventually give up. Don't block shutdown so that
// we don't block shutting down the client in -dev mode
go func(result *volumePublishResult) {
err := c.unmountWithRetry(result)
if err != nil {
c.logger.Error("volume could not be unmounted")
}
err = c.unpublish(result)
if err != nil {
c.logger.Error("volume could not be unpublished")
}
}(result)
}
// we can't recover from this RPC error client-side; the
// volume claim GC job will have to clean up for us once
// the allocation is marked terminal
errs <- c.unpublish(result)
}(result)
}
wg.Wait()
close(errs) // so we don't block waiting if there were no errors
var mErr *multierror.Error
for err := range errs {
mErr = multierror.Append(mErr, err)
}
return mErr.ErrorOrNil()
}
type volumePublishResult struct {
request *structs.VolumeRequest // the request from the jobspec
volume *structs.CSIVolume // the volume we get back from the server
publishContext map[string]string // populated after claim if provided by plugin
stub *state.CSIVolumeStub // populated from volume, plugin, or stub
}
// validateTasksSupportCSI verifies that at least one task in the group uses a
// task driver that supports CSI. This prevents us from publishing CSI volumes
// only to find out once we get to the taskrunner/volume_hook that no task can
// mount them.
func (c *csiHook) validateTasksSupportCSI(tg *structs.TaskGroup) error {
for _, task := range tg.Tasks {
caps, err := c.allocRunnerShim.GetTaskDriverCapabilities(task.Name)
if err != nil {
return fmt.Errorf("could not validate task driver capabilities: %v", err)
}
if caps.MountConfigs == drivers.MountConfigSupportNone {
continue
}
return nil
}
return fmt.Errorf("no task supports CSI")
}
// restoreMounts tries to restore the mount info from the local client state and
// then verifies it with the plugin. If the volume is already mounted, we don't
// want to re-run the claim and mount workflow again. This lets us tolerate
// restarting clients even on disconnected nodes.
func (c *csiHook) restoreMounts(results map[string]*volumePublishResult) error {
stubs, err := c.allocRunnerShim.GetCSIVolumes()
if err != nil {
return err
}
if stubs == nil {
return nil // no previous volumes
}
for _, result := range results {
stub := stubs[result.request.Name]
if stub == nil {
continue
}
result.stub = stub
if result.stub.MountInfo != nil && result.stub.PluginID != "" {
// make sure the plugin is ready or becomes so quickly.
plugin := result.stub.PluginID
pType := dynamicplugins.PluginTypeCSINode
if err := c.csimanager.WaitForPlugin(c.shutdownCtx, pType, plugin); err != nil {
return err
}
c.logger.Debug("found CSI plugin", "type", pType, "name", plugin)
manager, err := c.csimanager.ManagerForPlugin(c.shutdownCtx, plugin)
if err != nil {
return err
}
isMounted, err := manager.HasMount(c.shutdownCtx, result.stub.MountInfo)
if err != nil {
return err
}
if !isMounted {
// the mount is gone, so clear this from our result state so it
// we can try to remount it with the plugin ID we have
result.stub.MountInfo = nil
}
}
}
return nil
}
// claimVolumes sends a claim to the server for each volume to mark it in use
// and kick off the controller publish workflow (optionally)
func (c *csiHook) claimVolumes(results map[string]*volumePublishResult) error {
for _, result := range results {
if result.stub.MountInfo != nil {
continue // already mounted
}
request := result.request
claimType := structs.CSIVolumeClaimWrite
if request.ReadOnly {
claimType = structs.CSIVolumeClaimRead
}
req := &structs.CSIVolumeClaimRequest{
VolumeID: result.stub.VolumeID,
AllocationID: c.alloc.ID,
NodeID: c.alloc.NodeID,
ExternalNodeID: result.stub.ExternalNodeID,
Claim: claimType,
AccessMode: request.AccessMode,
AttachmentMode: request.AttachmentMode,
WriteRequest: structs.WriteRequest{
Region: c.alloc.Job.Region,
Namespace: c.alloc.Job.Namespace,
AuthToken: c.nodeSecret,
},
}
resp, err := c.claimWithRetry(req)
if err != nil {
return fmt.Errorf("could not claim volume %s: %w", req.VolumeID, err)
}
if resp.Volume == nil {
return fmt.Errorf("Unexpected nil volume returned for ID: %v", request.Source)
}
result.volume = resp.Volume
// populate data we'll write later to disk
result.stub.VolumeID = resp.Volume.ID
result.stub.VolumeNamespace = resp.Volume.Namespace
result.stub.VolumeExternalID = resp.Volume.RemoteID()
result.stub.PluginID = resp.Volume.PluginID
result.publishContext = resp.PublishContext
}
return nil
}
func (c *csiHook) mountVolumes(results map[string]*volumePublishResult) error {
for _, result := range results {
if result.stub.MountInfo != nil {
continue // already mounted
}
if result.volume == nil {
return fmt.Errorf("volume not available from claim for mounting volume request %q",
result.request.Name) // should be unreachable
}
// make sure the plugin is ready or becomes so quickly.
plugin := result.volume.PluginID
pType := dynamicplugins.PluginTypeCSINode
if err := c.csimanager.WaitForPlugin(c.shutdownCtx, pType, plugin); err != nil {
return err
}
c.logger.Debug("found CSI plugin", "type", pType, "name", plugin)
manager, err := c.csimanager.ManagerForPlugin(c.shutdownCtx, plugin)
if err != nil {
return err
}
usageOpts := &csimanager.UsageOptions{
ReadOnly: result.request.ReadOnly,
AttachmentMode: result.request.AttachmentMode,
AccessMode: result.request.AccessMode,
MountOptions: result.request.MountOptions,
}
mountInfo, err := manager.MountVolume(
c.shutdownCtx, result.volume, c.alloc, usageOpts, result.publishContext)
if err != nil {
return err
}
result.stub.MountInfo = mountInfo
}
return nil
}
// claimWithRetry tries to claim the volume on the server, retrying
// with exponential backoff capped to a maximum interval
func (c *csiHook) claimWithRetry(req *structs.CSIVolumeClaimRequest) (*structs.CSIVolumeClaimResponse, error) {
ctx, cancel := context.WithTimeout(c.shutdownCtx, c.maxBackoffDuration)
defer cancel()
var resp structs.CSIVolumeClaimResponse
var err error
backoff := c.minBackoffInterval
t, stop := helper.NewSafeTimer(0)
defer stop()
for {
select {
case <-ctx.Done():
return nil, err
case <-t.C:
}
err = c.rpcClient.RPC("CSIVolume.Claim", req, &resp)
if err == nil {
break
}
if !isRetryableClaimRPCError(err) {
break
}
if backoff < c.maxBackoffInterval {
backoff = backoff * 2
if backoff > c.maxBackoffInterval {
backoff = c.maxBackoffInterval
}
}
c.logger.Debug(
"volume could not be claimed because it is in use", "retry_in", backoff)
t.Reset(backoff)
}
return &resp, err
}
// isRetryableClaimRPCError looks for errors where we need to retry
// with backoff because we expect them to be eventually resolved.
func isRetryableClaimRPCError(err error) bool {
// note: because these errors are returned via RPC which breaks error
// wrapping, we can't check with errors.Is and need to read the string
errMsg := err.Error()
if strings.Contains(errMsg, structs.ErrCSIVolumeMaxClaims.Error()) {
return true
}
if strings.Contains(errMsg, structs.ErrCSIClientRPCRetryable.Error()) {
return true
}
if strings.Contains(errMsg, "no servers") {
return true
}
if strings.Contains(errMsg, structs.ErrNoLeader.Error()) {
return true
}
return false
}
func (c *csiHook) shouldRun() bool {
tg := c.alloc.Job.LookupTaskGroup(c.alloc.TaskGroup)
for _, vol := range tg.Volumes {
if vol.Type == structs.VolumeTypeCSI {
return true
}
}
return false
}
func (c *csiHook) unpublish(result *volumePublishResult) error {
mode := structs.CSIVolumeClaimRead
if !result.request.ReadOnly {
mode = structs.CSIVolumeClaimWrite
}
source := result.request.Source
if result.request.PerAlloc {
// NOTE: PerAlloc can't be set if we have canaries
source = source + structs.AllocSuffix(c.alloc.Name)
}
req := &structs.CSIVolumeUnpublishRequest{
VolumeID: source,
Claim: &structs.CSIVolumeClaim{
AllocationID: c.alloc.ID,
NodeID: c.alloc.NodeID,
Mode: mode,
State: structs.CSIVolumeClaimStateUnpublishing,
},
WriteRequest: structs.WriteRequest{
Region: c.alloc.Job.Region,
Namespace: c.alloc.Job.Namespace,
AuthToken: c.nodeSecret,
},
}
return c.rpcClient.RPC("CSIVolume.Unpublish",
req, &structs.CSIVolumeUnpublishResponse{})
}
// unmountWithRetry tries to unmount/unstage the volume, retrying with
// exponential backoff capped to a maximum interval
func (c *csiHook) unmountWithRetry(result *volumePublishResult) error {
ctx, cancel := context.WithTimeout(c.shutdownCtx, c.maxBackoffDuration)
defer cancel()
var err error
backoff := c.minBackoffInterval
t, stop := helper.NewSafeTimer(0)
defer stop()
for {
select {
case <-ctx.Done():
return err
case <-t.C:
}
err = c.unmountImpl(result)
if err == nil {
break
}
if backoff < c.maxBackoffInterval {
backoff = backoff * 2
if backoff > c.maxBackoffInterval {
backoff = c.maxBackoffInterval
}
}
c.logger.Debug("volume could not be unmounted", "retry_in", backoff)
t.Reset(backoff)
}
return nil
}
// unmountImpl implements the call to the CSI plugin manager to
// unmount the volume. Each retry will write an "Unmount volume"
// NodeEvent
func (c *csiHook) unmountImpl(result *volumePublishResult) error {
manager, err := c.csimanager.ManagerForPlugin(c.shutdownCtx, result.stub.PluginID)
if err != nil {
return err
}
usageOpts := &csimanager.UsageOptions{
ReadOnly: result.request.ReadOnly,
AttachmentMode: result.request.AttachmentMode,
AccessMode: result.request.AccessMode,
MountOptions: result.request.MountOptions,
}
return manager.UnmountVolume(c.shutdownCtx,
result.stub.VolumeNamespace, result.stub.VolumeID,
result.stub.VolumeExternalID, c.alloc.ID, usageOpts)
}
// Shutdown will get called when the client is gracefully
// stopping. Cancel our shutdown context so that we don't block client
// shutdown while in the CSI RPC retry loop.
func (c *csiHook) Shutdown() {
c.logger.Trace("shutting down hook")
c.shutdownCancelFn()
}
// Destroy will get called when an allocation gets GC'd on the client
// or when a -dev mode client is stopped. Cancel our shutdown context
// so that we don't block client shutdown while in the CSI RPC retry
// loop.
func (c *csiHook) Destroy() {
c.logger.Trace("destroying hook")
c.shutdownCancelFn()
}