Skip to content

Commit

Permalink
dynamic host volumes: send register to client for fingerprint
Browse files Browse the repository at this point in the history
When we register a volume without a plugin, we need to send a client RPC so that
the node fingerprint can be updated. The registered volume also needs to be
written to client state so that we can restore the fingerprint after a restart.
  • Loading branch information
tgross committed Jan 7, 2025
1 parent 024c504 commit 6ac78c6
Show file tree
Hide file tree
Showing 11 changed files with 470 additions and 132 deletions.
18 changes: 18 additions & 0 deletions client/host_volume_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,24 @@ func (v *HostVolume) Create(
return nil
}

func (v *HostVolume) Register(
req *cstructs.ClientHostVolumeRegisterRequest,
resp *cstructs.ClientHostVolumeRegisterResponse) error {

defer metrics.MeasureSince([]string{"client", "host_volume", "register"}, time.Now())
ctx, cancelFn := v.requestContext()
defer cancelFn()

err := v.c.hostVolumeManager.Register(ctx, req)
if err != nil {
v.c.logger.Error("failed to register host volume", "name", req.Name, "error", err)
return err
}

v.c.logger.Info("registered host volume", "id", req.ID, "path", req.HostPath)
return nil
}

func (v *HostVolume) Delete(
req *cstructs.ClientHostVolumeDeleteRequest,
resp *cstructs.ClientHostVolumeDeleteResponse) error {
Expand Down
89 changes: 76 additions & 13 deletions client/host_volume_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,15 @@ package client

import (
"path/filepath"
"sort"
"testing"

"github.com/hashicorp/nomad/ci"
hvm "github.com/hashicorp/nomad/client/hostvolumemanager"
"github.com/hashicorp/nomad/client/state"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/helper/uuid"
"github.com/shoenig/test/must"
)

Expand All @@ -33,15 +34,17 @@ func TestHostVolume(t *testing.T) {
SharedMountDir: tmp,
})
client.hostVolumeManager = manager
expectDir := filepath.Join(tmp, "test-vol-id")
expectDir := filepath.Join(tmp, "test-vol-id-1")

hostPath := t.TempDir()

t.Run("happy", func(t *testing.T) {

/* create */

req := &cstructs.ClientHostVolumeCreateRequest{
Name: "test-vol-name",
ID: "test-vol-id",
Name: "created-volume",
ID: "test-vol-id-1",
PluginID: "mkdir", // real plugin really makes a dir
}
var resp cstructs.ClientHostVolumeCreateResponse
Expand All @@ -60,6 +63,7 @@ func TestHostVolume(t *testing.T) {
expectState := &cstructs.HostVolumeState{
ID: req.ID,
CreateReq: req,
HostPath: expectDir,
}
must.Eq(t, expectState, vols[0])
// and should be fingerprinted
Expand All @@ -71,11 +75,53 @@ func TestHostVolume(t *testing.T) {
},
}, client.Node().HostVolumes)

/* register */

regReq := &cstructs.ClientHostVolumeRegisterRequest{
ID: "test-vol-id-2",
Name: "registered-volume",
NodeID: uuid.Generate(),
HostPath: hostPath,
CapacityBytes: 1000,
}
var regResp cstructs.ClientHostVolumeRegisterResponse
err = client.ClientRPC("HostVolume.Register", regReq, &regResp)
must.NoError(t, err)

// ensure we saved to client state
vols, err = memdb.GetDynamicHostVolumes()
must.NoError(t, err)
must.Len(t, 2, vols)
sort.Slice(vols, func(i, j int) bool { return vols[i].ID < vols[j].ID })
expectState = &cstructs.HostVolumeState{
ID: regReq.ID,
HostPath: hostPath,
CreateReq: &cstructs.ClientHostVolumeCreateRequest{
ID: regReq.ID,
Name: regReq.Name,
NodeID: regReq.NodeID,
},
}
must.Eq(t, expectState, vols[1])
// and should be fingerprinted
must.Eq(t, hvm.VolumeMap{
req.Name: {
ID: req.ID,
Name: req.Name,
Path: expectDir,
},
regReq.Name: {
ID: regReq.ID,
Name: regReq.Name,
Path: hostPath,
},
}, client.Node().HostVolumes)

/* delete */

delReq := &cstructs.ClientHostVolumeDeleteRequest{
Name: "test-vol-name",
ID: "test-vol-id",
Name: "created-volume",
ID: "test-vol-id-1",
PluginID: "mkdir",
HostPath: expectDir,
}
Expand All @@ -88,9 +134,26 @@ func TestHostVolume(t *testing.T) {
// client state should be deleted
vols, err = memdb.GetDynamicHostVolumes()
must.NoError(t, err)
must.Len(t, 0, vols)
must.Len(t, 1, vols)
// and the fingerprint, too
must.Eq(t, map[string]*structs.ClientHostVolumeConfig{}, client.Node().HostVolumes)
must.Eq(t, hvm.VolumeMap{
regReq.Name: {
ID: regReq.ID,
Name: regReq.Name,
Path: hostPath,
},
}, client.Node().HostVolumes)

delReq.Name = "registered-volume"
delReq.ID = "test-vol-id-2"
err = client.ClientRPC("HostVolume.Delete", delReq, &delResp)
must.NoError(t, err)
must.NotNil(t, delResp)

vols, err = memdb.GetDynamicHostVolumes()
must.NoError(t, err)
must.Len(t, 0, vols)
must.Eq(t, hvm.VolumeMap{}, client.Node().HostVolumes)
})

t.Run("missing plugin", func(t *testing.T) {
Expand Down Expand Up @@ -119,20 +182,20 @@ func TestHostVolume(t *testing.T) {
})

req := &cstructs.ClientHostVolumeCreateRequest{
ID: "test-vol-id",
Name: "test-vol-name",
ID: "test-vol-id-1",
Name: "created-volume",
PluginID: "mkdir",
}
var resp cstructs.ClientHostVolumeCreateResponse
err := client.ClientRPC("HostVolume.Create", req, &resp)
must.ErrorContains(t, err, "host_volume_endpoint_test.go/test-vol-id: not a directory")
must.ErrorContains(t, err, "host_volume_endpoint_test.go/test-vol-id-1: not a directory")

delReq := &cstructs.ClientHostVolumeDeleteRequest{
ID: "test-vol-id",
ID: "test-vol-id-1",
PluginID: "mkdir",
}
var delResp cstructs.ClientHostVolumeDeleteResponse
err = client.ClientRPC("HostVolume.Delete", delReq, &delResp)
must.ErrorContains(t, err, "host_volume_endpoint_test.go/test-vol-id: not a directory")
must.ErrorContains(t, err, "host_volume_endpoint_test.go/test-vol-id-1: not a directory")
})
}
156 changes: 122 additions & 34 deletions client/hostvolumemanager/host_volumes.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ package hostvolumemanager
import (
"context"
"errors"
"fmt"
"os"
"path/filepath"
"sync"

Expand Down Expand Up @@ -101,6 +103,7 @@ func (hvm *HostVolumeManager) Create(ctx context.Context,

volState := &cstructs.HostVolumeState{
ID: req.ID,
HostPath: pluginResp.Path,
CreateReq: req,
}
if err := hvm.stateMgr.PutDynamicHostVolume(volState); err != nil {
Expand Down Expand Up @@ -135,19 +138,63 @@ func (hvm *HostVolumeManager) Create(ctx context.Context,
return resp, nil
}

// Register saves the request to state, and updates the node with the volume.
func (hvm *HostVolumeManager) Register(ctx context.Context,
req *cstructs.ClientHostVolumeRegisterRequest) error {

// can't have two of the same volume name w/ different IDs per client node
if err := hvm.locker.lock(req.Name, req.ID); err != nil {
return err
}

_, err := os.Stat(req.HostPath)
if err != nil {
hvm.locker.release(req.Name)
return fmt.Errorf("could not verify host path for %q: %w", req.Name, err)
}

// generate a stub create request and plugin response for the fingerprint
// and client state
creq := &cstructs.ClientHostVolumeCreateRequest{
ID: req.ID,
Name: req.Name,
NodeID: req.NodeID,
Parameters: req.Parameters,
}
pluginResp := &HostVolumePluginCreateResponse{
Path: req.HostPath,
}

volState := &cstructs.HostVolumeState{
ID: req.ID,
CreateReq: creq,
HostPath: req.HostPath,
}
if err := hvm.stateMgr.PutDynamicHostVolume(volState); err != nil {
hvm.log.Error("failed to save volume in state", "volume_id", req.ID, "error", err)
hvm.locker.release(req.Name)
return helper.FlattenMultierror(err)
}

hvm.updateNodeVols(req.Name, genVolConfig(creq, pluginResp))
return nil
}

// Delete runs the appropriate plugin for the given request, removes it from
// state, and updates the node to remove the volume.
func (hvm *HostVolumeManager) Delete(ctx context.Context,
req *cstructs.ClientHostVolumeDeleteRequest) (*cstructs.ClientHostVolumeDeleteResponse, error) {

plug, err := hvm.getPlugin(req.PluginID)
if err != nil {
return nil, err
}
if req.PluginID != "" {
plug, err := hvm.getPlugin(req.PluginID)
if err != nil {
return nil, err
}

err = plug.Delete(ctx, req)
if err != nil {
return nil, err
err = plug.Delete(ctx, req)
if err != nil {
return nil, err
}
}

if err := hvm.stateMgr.DeleteDynamicHostVolume(req.ID); err != nil {
Expand Down Expand Up @@ -193,41 +240,22 @@ func (hvm *HostVolumeManager) restoreFromState(ctx context.Context) (VolumeMap,
return volumes, nil // nothing to do
}

// re-"create" the volumes - plugins have the best knowledge of their
// side effects, and they must be idempotent.
group := multierror.Group{}
for _, vol := range vols {
group.Go(func() error {
// missing plugins with associated volumes in state are considered
// client-stopping errors. they need to be fixed by cluster admins.
plug, err := hvm.getPlugin(vol.CreateReq.PluginID)
if err != nil {
return err
var volCfg *structs.ClientHostVolumeConfig
var err error
if vol.CreateReq.PluginID == "" {
volCfg, err = hvm.restoreForRegister(vol)
} else {
volCfg, err = hvm.restoreForCreate(ctx, vol)
}

// lock the name so future creates can't produce duplicates.
err = hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID)
// state should never have duplicate vol names, and restore happens
// prior to node registration, so new creates shouldn't come in
// concurrently, but check for error just in case.
if err != nil {
hvm.log.Error("error during restore",
"volume_name", vol.CreateReq.Name,
"volume_id", vol.CreateReq.ID,
"error", err)
// don't stop the world if it does happen, because an admin
// couldn't do anything about it short of wiping client state.
return nil
return err
}

resp, err := plug.Create(ctx, vol.CreateReq)
if err != nil {
// plugin execution errors are only logged
hvm.log.Error("failed to restore", "plugin_id", vol.CreateReq.PluginID, "volume_id", vol.ID, "error", err)
return nil
}
mut.Lock()
volumes[vol.CreateReq.Name] = genVolConfig(vol.CreateReq, resp)
volumes[vol.CreateReq.Name] = volCfg
mut.Unlock()
return nil
})
Expand All @@ -236,6 +264,66 @@ func (hvm *HostVolumeManager) restoreFromState(ctx context.Context) (VolumeMap,
return volumes, helper.FlattenMultierror(mErr.ErrorOrNil())
}

// restoreForCreate restores a single volume that was previously created by
// Create, by "recreating" the volumes. Plugins have the best knowledge of their
// side effects, and they must be idempotent.
func (hvm *HostVolumeManager) restoreForCreate(ctx context.Context, vol *cstructs.HostVolumeState) (*structs.ClientHostVolumeConfig, error) {
// missing plugins with associated volumes in state are considered
// client-stopping errors. they need to be fixed by cluster admins.
plug, err := hvm.getPlugin(vol.CreateReq.PluginID)
if err != nil {
return nil, err
}

// lock the name so future creates can't produce duplicates.
err = hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID)
// state should never have duplicate vol names, and restore happens
// prior to node registration, so new creates shouldn't come in
// concurrently, but check for error just in case.
if err != nil {
hvm.log.Error("error during restore",
"volume_name", vol.CreateReq.Name,
"volume_id", vol.CreateReq.ID,
"error", err)
// don't stop the world if it does happen, because an admin
// couldn't do anything about it short of wiping client state.
return nil, nil
}

resp, err := plug.Create(ctx, vol.CreateReq)
if err != nil {
// plugin execution errors are only logged
hvm.log.Error("failed to restore",
"plugin_id", vol.CreateReq.PluginID, "volume_id", vol.ID, "error", err)
return nil, nil
}

return genVolConfig(vol.CreateReq, resp), nil
}

// restoreForRegister restores a single volume that was previously created by
// Register, by converting the stored struct. It otherwise behaves the same as
// restoreForCreate.
func (hvm *HostVolumeManager) restoreForRegister(vol *cstructs.HostVolumeState) (*structs.ClientHostVolumeConfig, error) {
err := hvm.locker.lock(vol.CreateReq.Name, vol.CreateReq.ID)
if err != nil {
hvm.log.Error("error during restore",
"volume_name", vol.CreateReq.Name,
"volume_id", vol.CreateReq.ID,
"error", err)
return nil, nil
}

_, err = os.Stat(vol.HostPath)
if err != nil {
hvm.log.Error("failed to restore: could not verify host path",
"volume_id", vol.ID, "error", err, "path", vol.HostPath)
return nil, nil
}

return genVolConfig(vol.CreateReq, &HostVolumePluginCreateResponse{Path: vol.HostPath}), nil
}

// genVolConfig generates the host volume config for the node to report as
// available to the servers for job scheduling.
func genVolConfig(req *cstructs.ClientHostVolumeCreateRequest, resp *HostVolumePluginCreateResponse) *structs.ClientHostVolumeConfig {
Expand Down
Loading

0 comments on commit 6ac78c6

Please sign in to comment.