Skip to content

Commit

Permalink
dynamic host volumes: send register to client for fingerprint
Browse files Browse the repository at this point in the history
When we register a volume without a plugin, we need to send a client RPC so that
the node fingerprint can be updated. The registered volume also needs to be
written to client state so that we can restore the fingerprint after a restart.
  • Loading branch information
tgross committed Jan 7, 2025
1 parent 024c504 commit fa258c8
Show file tree
Hide file tree
Showing 9 changed files with 379 additions and 119 deletions.
18 changes: 18 additions & 0 deletions client/host_volume_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,24 @@ func (v *HostVolume) Create(
return nil
}

func (v *HostVolume) Register(
req *cstructs.ClientHostVolumeRegisterRequest,
resp *cstructs.ClientHostVolumeRegisterResponse) error {

defer metrics.MeasureSince([]string{"client", "host_volume", "register"}, time.Now())
ctx, cancelFn := v.requestContext()
defer cancelFn()

err := v.c.hostVolumeManager.Register(ctx, req)
if err != nil {
v.c.logger.Error("failed to register host volume", "name", req.Name, "error", err)
return err
}

v.c.logger.Info("registered host volume", "id", req.ID, "path", req.HostPath)
return nil
}

func (v *HostVolume) Delete(
req *cstructs.ClientHostVolumeDeleteRequest,
resp *cstructs.ClientHostVolumeDeleteResponse) error {
Expand Down
156 changes: 122 additions & 34 deletions client/hostvolumemanager/host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package hostvolumemanager
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"sync"

Expand Down Expand Up @@ -101,6 +103,7 @@ func (hvm *HostVolumeManager) Create(ctx context.Context,

volState := &cstructs.HostVolumeState{
ID: req.ID,
HostPath: pluginResp.Path,
CreateReq: req,
}
if err := hvm.stateMgr.PutDynamicHostVolume(volState); err != nil {
Expand Down Expand Up @@ -135,19 +138,63 @@ func (hvm *HostVolumeManager) Create(ctx context.Context,
return resp, nil
}

// Register saves the request to state, and updates the node with the volume.
func (hvm *HostVolumeManager) Register(ctx context.Context,
req *cstructs.ClientHostVolumeRegisterRequest) error {

// can't have two of the same volume name w/ different IDs per client node
if err := hvm.locker.lock(req.Name, req.ID); err != nil {
return err
}

_, err := os.Stat(req.HostPath)
if err != nil {
hvm.locker.release(req.Name)
return fmt.Errorf("could not verify host path for %q: %w", req.Name, err)
}

// generate a stub create request and plugin response for the fingerprint
// and client state
creq := &cstructs.ClientHostVolumeCreateRequest{
ID: req.ID,
Name: req.Name,
NodeID: req.NodeID,
Parameters: req.Parameters,
}
pluginResp := &HostVolumePluginCreateResponse{
Path: req.HostPath,
}

volState := &cstructs.HostVolumeState{
ID: req.ID,
CreateReq: creq,
HostPath: req.HostPath,
}
if err := hvm.stateMgr.PutDynamicHostVolume(volState); err != nil {
hvm.log.Error("failed to save volume in state", "volume_id", req.ID, "error", err)
hvm.locker.release(req.Name)
return helper.FlattenMultierror(err)
}

hvm.updateNodeVols(req.Name, genVolConfig(creq, pluginResp))
return nil
}

// Delete runs the appropriate plugin for the given request, removes it from
// state, and updates the node to remove the volume.
func (hvm *HostVolumeManager) Delete(ctx context.Context,
req *cstructs.ClientHostVolumeDeleteRequest) (*cstructs.ClientHostVolumeDeleteResponse, error) {

plug, err := hvm.getPlugin(req.PluginID)
if err != nil {
return nil, err
}
if req.PluginID != "" {
plug, err := hvm.getPlugin(req.PluginID)
if err != nil {
return nil, err
}

err = plug.Delete(ctx, req)
if err != nil {
return nil, err
err = plug.Delete(ctx, req)
if err != nil {
return nil, err
}
}

if err := hvm.stateMgr.DeleteDynamicHostVolume(req.ID); err != nil {
Expand Down Expand Up @@ -193,41 +240,22 @@ func (hvm *HostVolumeManager) restoreFromState(ctx context.Context) (VolumeMap,
return volumes, nil // nothing to do
}

// re-"create" the volumes - plugins have the best knowledge of their
// side effects, and they must be idempotent.
group := multierror.Group{}
for _, vol := range vols {
group.Go(func() error {
// missing plugins with associated volumes in state are considered
// client-stopping errors. they need to be fixed by cluster admins.
plug, err := hvm.getPlugin(vol.CreateReq.PluginID)
if err != nil {
return err
var volCfg *structs.ClientHostVolumeConfig
var err error
if vol.CreateReq.PluginID == "" {
volCfg, err = hvm.restoreForRegister(vol)
} else {
volCfg, err = hvm.restoreForCreate(ctx, vol)
}

// lock the name so future creates can't produce duplicates.
err = hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID)
// state should never have duplicate vol names, and restore happens
// prior to node registration, so new creates shouldn't come in
// concurrently, but check for error just in case.
if err != nil {
hvm.log.Error("error during restore",
"volume_name", vol.CreateReq.Name,
"volume_id", vol.CreateReq.ID,
"error", err)
// don't stop the world if it does happen, because an admin
// couldn't do anything about it short of wiping client state.
return nil
return err
}

resp, err := plug.Create(ctx, vol.CreateReq)
if err != nil {
// plugin execution errors are only logged
hvm.log.Error("failed to restore", "plugin_id", vol.CreateReq.PluginID, "volume_id", vol.ID, "error", err)
return nil
}
mut.Lock()
volumes[vol.CreateReq.Name] = genVolConfig(vol.CreateReq, resp)
volumes[vol.CreateReq.Name] = volCfg
mut.Unlock()
return nil
})
Expand All @@ -236,6 +264,66 @@ func (hvm *HostVolumeManager) restoreFromState(ctx context.Context) (VolumeMap,
return volumes, helper.FlattenMultierror(mErr.ErrorOrNil())
}

// restoreForCreate restores a single volume that was previously created by
// Create, by "recreating" the volumes. Plugins have the best knowledge of their
// side effects, and they must be idempotent.
func (hvm *HostVolumeManager) restoreForCreate(ctx context.Context, vol *cstructs.HostVolumeState) (*structs.ClientHostVolumeConfig, error) {
// missing plugins with associated volumes in state are considered
// client-stopping errors. they need to be fixed by cluster admins.
plug, err := hvm.getPlugin(vol.CreateReq.PluginID)
if err != nil {
return nil, err
}

// lock the name so future creates can't produce duplicates.
err = hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID)
// state should never have duplicate vol names, and restore happens
// prior to node registration, so new creates shouldn't come in
// concurrently, but check for error just in case.
if err != nil {
hvm.log.Error("error during restore",
"volume_name", vol.CreateReq.Name,
"volume_id", vol.CreateReq.ID,
"error", err)
// don't stop the world if it does happen, because an admin
// couldn't do anything about it short of wiping client state.
return nil, nil
}

resp, err := plug.Create(ctx, vol.CreateReq)
if err != nil {
// plugin execution errors are only logged
hvm.log.Error("failed to restore",
"plugin_id", vol.CreateReq.PluginID, "volume_id", vol.ID, "error", err)
return nil, nil
}

return genVolConfig(vol.CreateReq, resp), nil
}

// restoreForRegister restores a single volume that was previously created by
// Register, by converting the stored struct. It otherwise behaves the same as
// restoreForCreate.
func (hvm *HostVolumeManager) restoreForRegister(vol *cstructs.HostVolumeState) (*structs.ClientHostVolumeConfig, error) {
err := hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID)
if err != nil {
hvm.log.Error("error during restore",
"volume_name", vol.CreateReq.Name,
"volume_id", vol.CreateReq.ID,
"error", err)
return nil, nil
}

_, err = os.Stat(vol.HostPath)
if err != nil {
hvm.log.Error("failed to restore: could not verify host path",
"volume_id", vol.ID, "error", err, "path", vol.HostPath)
return nil, nil
}

return genVolConfig(vol.CreateReq, &HostVolumePluginCreateResponse{Path: vol.HostPath}), nil
}

// genVolConfig generates the host volume config for the node to report as
// available to the servers for job scheduling.
func genVolConfig(req *cstructs.ClientHostVolumeCreateRequest, resp *HostVolumePluginCreateResponse) *structs.ClientHostVolumeConfig {
Expand Down
Loading

0 comments on commit fa258c8

Please sign in to comment.