Skip to content

Commit

Permalink
CSI: allow for concurrent plugin allocations (#12078)
Browse files Browse the repository at this point in the history
The dynamic plugin registry assumes that plugins are singletons, which
matches the behavior of other Nomad plugins. But because dynamic
plugins like CSI are implemented by allocations, we need to handle the
possibility of multiple allocations for a given plugin type + ID, as
well as behaviors around interleaved allocation starts and stops.

Update the data structure for the dynamic registry so that more recent
allocations take over as the instance manager singleton, but we still
preserve the previous running allocations so that restores work
without racing.

Multiple allocations can run on a client for the same plugin, even if
only during updates. Provide each plugin task a unique path for the
control socket so that the tasks don't interfere with each other.
  • Loading branch information
tgross authored Feb 23, 2022
1 parent 822285f commit 7bcf0af
Show file tree
Hide file tree
Showing 13 changed files with 672 additions and 192 deletions.
11 changes: 11 additions & 0 deletions .changelog/12078.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
```release-note:improvement
csi: Allow for concurrent plugin allocations
```

```release-note:breaking-change
client: The client state store will be automatically migrated to a new schema version when upgrading a client. Downgrading to a previous version of the client after upgrading it to Nomad 1.3 is not supported. To downgrade safely, users should erase the Nomad client's data directory.
```

```release-note:breaking-change
csi: The client filesystem layout for CSI plugins has been updated to correctly handle the lifecycle of multiple allocations serving the same plugin. Running plugin tasks will not be updated after upgrading the client, but it is recommended to redeploy CSI plugin jobs after upgrading the cluster.
```
100 changes: 76 additions & 24 deletions client/allocrunner/taskrunner/plugin_supervisor_hook.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,20 @@ import (
// tasks. These plugins will be fingerprinted and it will manage connecting them
// to their requisite plugin manager.
//
// It provides a couple of things to a task running inside Nomad. These are:
// * A mount to the `plugin_mount_dir`, that will then be used by Nomad
// to connect to the nested plugin and handle volume mounts.
// It provides a few things to a plugin task running inside Nomad. These are:
// * A mount to the `csi_plugin.mount_dir` where the plugin will create its csi.sock
// * A mount to `local/csi` that node plugins will use to stage volume mounts.
// * When the task has started, it starts a loop of attempting to connect to the
// plugin, to perform initial fingerprinting of the plugins capabilities before
// notifying the plugin manager of the plugin.
type csiPluginSupervisorHook struct {
logger hclog.Logger
alloc *structs.Allocation
task *structs.Task
runner *TaskRunner
mountPoint string
logger hclog.Logger
alloc *structs.Allocation
task *structs.Task
runner *TaskRunner
mountPoint string
socketMountPoint string
socketPath string

caps *drivers.Capabilities

Expand Down Expand Up @@ -73,20 +75,36 @@ var _ interfaces.TaskPoststartHook = &csiPluginSupervisorHook{}
// with the catalog and to ensure any mounts are cleaned up.
var _ interfaces.TaskStopHook = &csiPluginSupervisorHook{}

// This hook creates a csi/ directory within the client's datadir used to
// manage plugins and mount points volumes. The layout is as follows:

// plugins/
// {alloc-id}/csi.sock
// Per-allocation directories of unix domain sockets used to communicate
// with the CSI plugin. Nomad creates the directory and the plugin creates
// the socket file. This directory is bind-mounted to the
// csi_plugin.mount_config dir in the plugin task.
//
// {plugin-type}/{plugin-id}/
// staging/
// {volume-id}/{usage-mode}/
// Intermediate mount point used by node plugins that support
// NODE_STAGE_UNSTAGE capability.
//
// per-alloc/
// {alloc-id}/{volume-id}/{usage-mode}/
// Mount point bound from the staging directory into tasks that use
// the mounted volumes

func newCSIPluginSupervisorHook(config *csiPluginSupervisorHookConfig) *csiPluginSupervisorHook {
task := config.runner.Task()

// The Plugin directory will look something like this:
// .
// ..
// csi.sock - A unix domain socket used to communicate with the CSI Plugin
// staging/
// {volume-id}/{usage-mode-hash}/ - Intermediary mount point that will be used by plugins that support NODE_STAGE_UNSTAGE capabilities.
// per-alloc/
// {alloc-id}/{volume-id}/{usage-mode-hash}/ - Mount Point that will be bind-mounted into tasks that utilise the volume
pluginRoot := filepath.Join(config.clientStateDirPath, "csi",
string(task.CSIPluginConfig.Type), task.CSIPluginConfig.ID)

socketMountPoint := filepath.Join(config.clientStateDirPath, "csi",
"plugins", config.runner.Alloc().ID)

shutdownCtx, cancelFn := context.WithCancel(context.Background())

hook := &csiPluginSupervisorHook{
Expand All @@ -96,6 +114,7 @@ func newCSIPluginSupervisorHook(config *csiPluginSupervisorHookConfig) *csiPlugi
logger: config.logger,
task: task,
mountPoint: pluginRoot,
socketMountPoint: socketMountPoint,
caps: config.capabilities,
shutdownCtx: shutdownCtx,
shutdownCancelFn: cancelFn,
Expand All @@ -122,18 +141,46 @@ func (h *csiPluginSupervisorHook) Prestart(ctx context.Context,
return fmt.Errorf("failed to create mount point: %v", err)
}

if err := os.MkdirAll(h.socketMountPoint, 0700); err != nil && !os.IsExist(err) {
return fmt.Errorf("failed to create socket mount point: %v", err)
}

// where the socket will be mounted
configMount := &drivers.MountConfig{
TaskPath: h.task.CSIPluginConfig.MountDir,
HostPath: h.socketMountPoint,
Readonly: false,
PropagationMode: "bidirectional",
}
// where the staging and per-alloc directories will be mounted
volumeStagingMounts := &drivers.MountConfig{
// TODO(tgross): add this TaskPath to the CSIPluginConfig as well
TaskPath: "/local/csi",
HostPath: h.mountPoint,
Readonly: false,
PropagationMode: "bidirectional",
}
// devices from the host
devMount := &drivers.MountConfig{
TaskPath: "/dev",
HostPath: "/dev",
Readonly: false,
}

// TODO(tgross): https://github.com/hashicorp/nomad/issues/11786
// If we're already registered, we should be able to update the
// definition in the update hook

// For backwards compatibility, ensure that we don't overwrite the
// socketPath on client restart with existing plugin allocations.
pluginInfo, _ := h.runner.dynamicRegistry.PluginForAlloc(
string(h.task.CSIPluginConfig.Type), h.task.CSIPluginConfig.ID, h.alloc.ID)
if pluginInfo != nil {
h.socketPath = pluginInfo.ConnectionInfo.SocketPath
} else {
h.socketPath = filepath.Join(h.socketMountPoint, structs.CSISocketName)
}

switch h.caps.FSIsolation {
case drivers.FSIsolationNone:
// Plugin tasks with no filesystem isolation won't have the
Expand All @@ -142,13 +189,15 @@ func (h *csiPluginSupervisorHook) Prestart(ctx context.Context,
// plugins will need to be aware of the csi directory layout
// in the client data dir
resp.Env = map[string]string{
"CSI_ENDPOINT": filepath.Join(h.mountPoint, "csi.sock")}
"CSI_ENDPOINT": h.socketPath}
default:
resp.Env = map[string]string{
"CSI_ENDPOINT": filepath.Join(h.task.CSIPluginConfig.MountDir, "csi.sock")}
"CSI_ENDPOINT": filepath.Join(
h.task.CSIPluginConfig.MountDir, structs.CSISocketName)}
}

mounts := ensureMountpointInserted(h.runner.hookResources.getMounts(), configMount)
mounts = ensureMountpointInserted(mounts, volumeStagingMounts)
mounts = ensureMountpointInserted(mounts, devMount)

h.runner.hookResources.setMounts(mounts)
Expand Down Expand Up @@ -203,9 +252,7 @@ func (h *csiPluginSupervisorHook) ensureSupervisorLoop(ctx context.Context) {
h.runningLock.Unlock()
}()

socketPath := filepath.Join(h.mountPoint, structs.CSISocketName)

client := csi.NewClient(socketPath, h.logger.Named("csi_client").With(
client := csi.NewClient(h.socketPath, h.logger.Named("csi_client").With(
"plugin.name", h.task.CSIPluginConfig.ID,
"plugin.type", h.task.CSIPluginConfig.Type))
defer client.Close()
Expand Down Expand Up @@ -249,7 +296,7 @@ WAITFORREADY:
}

// Step 2: Register the plugin with the catalog.
deregisterPluginFn, err := h.registerPlugin(client, socketPath)
deregisterPluginFn, err := h.registerPlugin(client, h.socketPath)
if err != nil {
h.kill(ctx, fmt.Errorf("CSI plugin failed to register: %v", err))
return
Expand Down Expand Up @@ -317,7 +364,7 @@ func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPat
Options: map[string]string{
"Provider": info.Name, // vendor name
"MountPoint": h.mountPoint,
"ContainerMountPoint": h.task.CSIPluginConfig.MountDir,
"ContainerMountPoint": "/local/csi",
},
}
}
Expand Down Expand Up @@ -348,8 +395,9 @@ func (h *csiPluginSupervisorHook) registerPlugin(client csi.CSIPlugin, socketPat
// closes over its own registration
rname := reg.Name
rtype := reg.Type
allocID := reg.AllocID
deregistrationFns = append(deregistrationFns, func() {
err := h.runner.dynamicRegistry.DeregisterPlugin(rtype, rname)
err := h.runner.dynamicRegistry.DeregisterPlugin(rtype, rname, allocID)
if err != nil {
h.logger.Error("failed to deregister csi plugin", "name", rname, "type", rtype, "error", err)
}
Expand Down Expand Up @@ -384,6 +432,10 @@ func (h *csiPluginSupervisorHook) supervisorLoopOnce(ctx context.Context, client
// Stop hooks must be idempotent. The context is cancelled prematurely if the
// task is killed.
func (h *csiPluginSupervisorHook) Stop(_ context.Context, req *interfaces.TaskStopRequest, _ *interfaces.TaskStopResponse) error {
err := os.RemoveAll(h.socketMountPoint)
if err != nil {
h.logger.Error("could not remove plugin socket directory", "dir", h.socketMountPoint, "error", err)
}
h.shutdownCancelFn()
return nil
}
Expand Down
94 changes: 74 additions & 20 deletions client/dynamicplugins/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package dynamicplugins

import (
"container/list"
"context"
"errors"
"fmt"
Expand All @@ -19,10 +20,11 @@ const (
// that are running as Nomad Tasks.
type Registry interface {
RegisterPlugin(info *PluginInfo) error
DeregisterPlugin(ptype, name string) error
DeregisterPlugin(ptype, name, allocID string) error

ListPlugins(ptype string) []*PluginInfo
DispensePlugin(ptype, name string) (interface{}, error)
PluginForAlloc(ptype, name, allocID string) (*PluginInfo, error)

PluginsUpdatedCh(ctx context.Context, ptype string) <-chan *PluginUpdateEvent

Expand All @@ -31,10 +33,11 @@ type Registry interface {
StubDispenserForType(ptype string, dispenser PluginDispenser)
}

// RegistryState is what we persist in the client state store. It contains
// a map of plugin types to maps of plugin name -> PluginInfo.
// RegistryState is what we persist in the client state
// store. It contains a map of plugin types to maps of plugin name ->
// list of *PluginInfo, sorted by recency of registration
type RegistryState struct {
Plugins map[string]map[string]*PluginInfo
Plugins map[string]map[string]*list.List
}

type PluginDispenser func(info *PluginInfo) (interface{}, error)
Expand All @@ -44,7 +47,7 @@ type PluginDispenser func(info *PluginInfo) (interface{}, error)
func NewRegistry(state StateStorage, dispensers map[string]PluginDispenser) Registry {

registry := &dynamicRegistry{
plugins: make(map[string]map[string]*PluginInfo),
plugins: make(map[string]map[string]*list.List),
broadcasters: make(map[string]*pluginEventBroadcaster),
dispensers: dispensers,
state: state,
Expand Down Expand Up @@ -122,7 +125,7 @@ type PluginUpdateEvent struct {
}

type dynamicRegistry struct {
plugins map[string]map[string]*PluginInfo
plugins map[string]map[string]*list.List
pluginsLock sync.RWMutex

broadcasters map[string]*pluginEventBroadcaster
Expand Down Expand Up @@ -180,18 +183,35 @@ func (d *dynamicRegistry) RegisterPlugin(info *PluginInfo) error {

pmap, ok := d.plugins[info.Type]
if !ok {
pmap = make(map[string]*PluginInfo, 1)
pmap = make(map[string]*list.List)
d.plugins[info.Type] = pmap
}
infos, ok := pmap[info.Name]
if !ok {
infos = list.New()
pmap[info.Name] = infos
}

pmap[info.Name] = info

broadcaster := d.broadcasterForPluginType(info.Type)
event := &PluginUpdateEvent{
EventType: EventTypeRegistered,
Info: info,
// TODO(tgross): https://github.com/hashicorp/nomad/issues/11786
// If we're already registered, we should update the definition
// and send a broadcast of any update so the instanceManager can
// be restarted if there's been a change
var alreadyRegistered bool
for e := infos.Front(); e != nil; e = e.Next() {
if e.Value.(*PluginInfo).AllocID == info.AllocID {
alreadyRegistered = true
break
}
}
if !alreadyRegistered {
infos.PushFront(info)
broadcaster := d.broadcasterForPluginType(info.Type)
event := &PluginUpdateEvent{
EventType: EventTypeRegistered,
Info: info,
}
broadcaster.broadcast(event)
}
broadcaster.broadcast(event)

return d.sync()
}
Expand All @@ -209,7 +229,7 @@ func (d *dynamicRegistry) broadcasterForPluginType(ptype string) *pluginEventBro
return broadcaster
}

func (d *dynamicRegistry) DeregisterPlugin(ptype, name string) error {
func (d *dynamicRegistry) DeregisterPlugin(ptype, name, allocID string) error {
d.pluginsLock.Lock()
defer d.pluginsLock.Unlock()

Expand All @@ -223,19 +243,30 @@ func (d *dynamicRegistry) DeregisterPlugin(ptype, name string) error {
// developers during the development of new plugin types.
return errors.New("must specify plugin name to deregister")
}
if allocID == "" {
return errors.New("must specify plugin allocation ID to deregister")
}

pmap, ok := d.plugins[ptype]
if !ok {
// If this occurs there's a bug in the registration handler.
return fmt.Errorf("no plugins registered for type: %s", ptype)
}

info, ok := pmap[name]
infos, ok := pmap[name]
if !ok {
// plugin already deregistered, don't send events or try re-deleting.
return nil
}
delete(pmap, name)

var info *PluginInfo
for e := infos.Front(); e != nil; e = e.Next() {
info = e.Value.(*PluginInfo)
if info.AllocID == allocID {
infos.Remove(e)
break
}
}

broadcaster := d.broadcasterForPluginType(ptype)
event := &PluginUpdateEvent{
Expand All @@ -259,7 +290,9 @@ func (d *dynamicRegistry) ListPlugins(ptype string) []*PluginInfo {
plugins := make([]*PluginInfo, 0, len(pmap))

for _, info := range pmap {
plugins = append(plugins, info)
if info.Front() != nil {
plugins = append(plugins, info.Front().Value.(*PluginInfo))
}
}

return plugins
Expand Down Expand Up @@ -302,11 +335,32 @@ func (d *dynamicRegistry) DispensePlugin(ptype string, name string) (interface{}
}

info, ok := pmap[name]
if !ok {
if !ok || info.Front() == nil {
return nil, fmt.Errorf("plugin %s for type %s not found", name, ptype)
}

return dispenseFunc(info)
return dispenseFunc(info.Front().Value.(*PluginInfo))
}

func (d *dynamicRegistry) PluginForAlloc(ptype, name, allocID string) (*PluginInfo, error) {
d.pluginsLock.Lock()
defer d.pluginsLock.Unlock()

pmap, ok := d.plugins[ptype]
if !ok {
return nil, fmt.Errorf("no plugins registered for type: %s", ptype)
}

infos, ok := pmap[name]
if ok {
for e := infos.Front(); e != nil; e = e.Next() {
plugin := e.Value.(*PluginInfo)
if plugin.AllocID == allocID {
return plugin, nil
}
}
}
return nil, fmt.Errorf("no plugin for that allocation")
}

// PluginsUpdatedCh returns a channel over which plugin events for the requested
Expand Down
Loading

0 comments on commit 7bcf0af

Please sign in to comment.