From 64b1b2c1c9cb0bdd76135f0dacd0a305c1faeb62 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Thu, 20 Jan 2022 11:30:08 -0500 Subject: [PATCH 1/2] csi: resolve invalid claim states on read It's currently possible for CSI volumes to be claimed by allocations that no longer exist. This changeset asserts a reasonable state at the state store level by registering these nil allocations as "past claims" on any read. This will cause any pass through the periodic GC or volumewatcher to trigger the unpublishing workflow for those claims. --- .changelog/11890.txt | 3 + nomad/core_sched.go | 4 + nomad/core_sched_test.go | 55 +++++- nomad/state/state_store.go | 26 +++ nomad/state/testing.go | 187 ++++++++++++++++++++ nomad/volumewatcher/volume_watcher_test.go | 29 +++ nomad/volumewatcher/volumes_watcher_test.go | 7 +- 7 files changed, 305 insertions(+), 6 deletions(-) create mode 100644 .changelog/11890.txt diff --git a/.changelog/11890.txt b/.changelog/11890.txt new file mode 100644 index 00000000000..1074aa29cb4 --- /dev/null +++ b/.changelog/11890.txt @@ -0,0 +1,3 @@ +```release-note:bug +csi: Fixed a bug where garbage collected allocations could block new claims on a volume +``` diff --git a/nomad/core_sched.go b/nomad/core_sched.go index cffb6114ba2..4306783efcb 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -782,6 +782,10 @@ NEXT_VOLUME: continue } + // TODO(tgross): consider moving the TerminalStatus check into + // the denormalize volume logic so that we can just check the + // volume for past claims + // we only call the claim release RPC if the volume has claims // that no longer have valid allocations. otherwise we'd send // out a lot of do-nothing RPCs. diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index a19d4395b07..095975a31f6 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -2383,19 +2383,64 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) { c := core.(*CoreScheduler) require.NoError(c.csiVolumeClaimGC(gc)) - // the volumewatcher will hit an error here because there's no - // path to the node. but we can't update the claim to bypass the - // client RPCs without triggering the volumewatcher's normal code - // path. + // TODO(tgross): the condition below means this test doesn't tell + // us much; ideally we should be intercepting the claim request + // and verifying that we send the expected claims but we don't + // have test infra in place to do that for server RPCs + + // sending the GC claim will trigger the volumewatcher's normal + // code path. but the volumewatcher will hit an error here + // because there's no path to the node, so we shouldn't see + // the WriteClaims removed require.Eventually(func() bool { vol, _ := state.CSIVolumeByID(ws, ns, volID) return len(vol.WriteClaims) == 1 && len(vol.WriteAllocs) == 1 && - len(vol.PastClaims) == 0 + len(vol.PastClaims) == 1 }, time.Second*1, 10*time.Millisecond, "claims were released unexpectedly") } +func TestCoreScheduler_CSIBadState_ClaimGC(t *testing.T) { + t.Parallel() + require := require.New(t) + + srv, shutdown := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + + defer shutdown() + testutil.WaitForLeader(t, srv.RPC) + + err := state.TestBadCSIState(t, srv.State()) + require.NoError(err) + + snap, err := srv.State().Snapshot() + require.NoError(err) + core := NewCoreScheduler(srv, snap) + + index, _ := srv.State().LatestIndex() + index++ + gc := srv.coreJobEval(structs.CoreJobForceGC, index) + c := core.(*CoreScheduler) + require.NoError(c.csiVolumeClaimGC(gc)) + + require.Eventually(func() bool { + vol, _ := srv.State().CSIVolumeByID(nil, + structs.DefaultNamespace, "csi-volume-nfs0") + if len(vol.PastClaims) != 2 { + return false + } + for _, claim := range vol.PastClaims { + if claim.State != structs.CSIVolumeClaimStateUnpublishing { + return false + } + } + return true + }, time.Second*1, 10*time.Millisecond, "invalid claims should be marked for GC") + +} + func TestCoreScheduler_FailLoop(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 7d8decfdd79..7551ad04535 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2513,6 +2513,18 @@ func (s *StateStore) CSIVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *st State: structs.CSIVolumeClaimStateTaken, } } + } else if _, ok := vol.PastClaims[id]; !ok { + // ensure that any allocs that have been GC'd since + // our last read are marked as past claims + vol.PastClaims[id] = &structs.CSIVolumeClaim{ + AllocationID: id, + Mode: structs.CSIVolumeClaimRead, + State: structs.CSIVolumeClaimStateUnpublishing, + } + readClaim := vol.ReadClaims[id] + if readClaim != nil { + vol.PastClaims[id].NodeID = readClaim.NodeID + } } } @@ -2531,6 +2543,20 @@ func (s *StateStore) CSIVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *st State: structs.CSIVolumeClaimStateTaken, } } + } else if _, ok := vol.PastClaims[id]; !ok { + // ensure that any allocs that have been GC'd since + // our last read are marked as past claims + + vol.PastClaims[id] = &structs.CSIVolumeClaim{ + AllocationID: id, + Mode: structs.CSIVolumeClaimWrite, + State: structs.CSIVolumeClaimStateUnpublishing, + } + writeClaim := vol.WriteClaims[id] + if writeClaim != nil { + vol.PastClaims[id].NodeID = writeClaim.NodeID + } + } } diff --git a/nomad/state/testing.go b/nomad/state/testing.go index 460df609773..0d570a95ade 100644 --- a/nomad/state/testing.go +++ b/nomad/state/testing.go @@ -1,7 +1,9 @@ package state import ( + "math" "testing" + "time" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" @@ -124,3 +126,188 @@ func createTestCSIPlugin(s *StateStore, id string, requiresController bool) func s.DeleteNode(structs.MsgTypeTestSetup, index, ids) } } + +func TestBadCSIState(t testing.TB, store *StateStore) error { + + pluginID := "org.democratic-csi.nfs" + + controllerInfo := func(isHealthy bool) map[string]*structs.CSIInfo { + desc := "healthy" + if !isHealthy { + desc = "failed fingerprinting with error" + } + return map[string]*structs.CSIInfo{ + pluginID: { + PluginID: pluginID, + AllocID: uuid.Generate(), + Healthy: isHealthy, + HealthDescription: desc, + RequiresControllerPlugin: true, + ControllerInfo: &structs.CSIControllerInfo{ + SupportsReadOnlyAttach: true, + SupportsAttachDetach: true, + }, + }, + } + } + + nodeInfo := func(nodeName string, isHealthy bool) map[string]*structs.CSIInfo { + desc := "healthy" + if !isHealthy { + desc = "failed fingerprinting with error" + } + return map[string]*structs.CSIInfo{ + pluginID: { + PluginID: pluginID, + AllocID: uuid.Generate(), + Healthy: isHealthy, + HealthDescription: desc, + RequiresControllerPlugin: true, + NodeInfo: &structs.CSINodeInfo{ + ID: nodeName, + MaxVolumes: math.MaxInt64, + RequiresNodeStageVolume: true, + }, + }, + } + } + + nodes := make([]*structs.Node, 3) + for i := range nodes { + n := mock.Node() + n.Attributes["nomad.version"] = "1.2.4" + nodes[i] = n + } + + nodes[0].CSIControllerPlugins = controllerInfo(true) + nodes[0].CSINodePlugins = nodeInfo("nomad-client0", true) + + drainID := uuid.Generate() + + // drained node + nodes[1].CSIControllerPlugins = controllerInfo(false) + nodes[1].CSINodePlugins = nodeInfo("nomad-client1", false) + + nodes[1].LastDrain = &structs.DrainMetadata{ + StartedAt: time.Now().Add(-10 * time.Minute), + UpdatedAt: time.Now().Add(-30 * time.Second), + Status: structs.DrainStatusComplete, + AccessorID: drainID, + } + nodes[1].SchedulingEligibility = structs.NodeSchedulingIneligible + + // previously drained but now eligible + nodes[2].CSIControllerPlugins = controllerInfo(true) + nodes[2].CSINodePlugins = nodeInfo("nomad-client2", true) + nodes[2].LastDrain = &structs.DrainMetadata{ + StartedAt: time.Now().Add(-15 * time.Minute), + UpdatedAt: time.Now().Add(-5 * time.Minute), + Status: structs.DrainStatusComplete, + AccessorID: drainID, + } + nodes[2].SchedulingEligibility = structs.NodeSchedulingEligible + + // Insert nodes into the state store + index := uint64(999) + for _, n := range nodes { + index++ + err := store.UpsertNode(structs.MsgTypeTestSetup, index, n) + if err != nil { + return err + } + } + + allocID0 := uuid.Generate() // nil alloc + allocID2 := uuid.Generate() // nil alloc + + alloc1 := mock.Alloc() + alloc1.ClientStatus = "complete" + alloc1.DesiredStatus = "stop" + + // Insert allocs into the state store + err := store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1}) + if err != nil { + return err + } + + vol := &structs.CSIVolume{ + ID: "csi-volume-nfs0", + Name: "csi-volume-nfs0", + ExternalID: "csi-volume-nfs0", + Namespace: "default", + AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + MountOptions: &structs.CSIMountOptions{ + MountFlags: []string{"noatime"}, + }, + Context: map[string]string{ + "node_attach_driver": "nfs", + "provisioner_driver": "nfs-client", + "server": "192.168.56.69", + }, + Capacity: 0, + RequestedCapacityMin: 107374182, + RequestedCapacityMax: 107374182, + RequestedCapabilities: []*structs.CSIVolumeCapability{ + { + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter, + }, + { + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter, + }, + { + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, + }, + }, + WriteAllocs: map[string]*structs.Allocation{ + allocID0: nil, + alloc1.ID: nil, + allocID2: nil, + }, + WriteClaims: map[string]*structs.CSIVolumeClaim{ + allocID0: { + AllocationID: allocID0, + NodeID: nodes[0].ID, + Mode: structs.CSIVolumeClaimWrite, + AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + State: structs.CSIVolumeClaimStateTaken, + }, + alloc1.ID: { + AllocationID: alloc1.ID, + NodeID: nodes[1].ID, + Mode: structs.CSIVolumeClaimWrite, + AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + State: structs.CSIVolumeClaimStateTaken, + }, + allocID2: { + AllocationID: allocID2, + NodeID: nodes[2].ID, + Mode: structs.CSIVolumeClaimWrite, + AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + State: structs.CSIVolumeClaimStateTaken, + }, + }, + Schedulable: true, + PluginID: pluginID, + Provider: pluginID, + ProviderVersion: "1.4.3", + ControllerRequired: true, + ControllersHealthy: 2, + ControllersExpected: 2, + NodesHealthy: 2, + NodesExpected: 0, + } + + err = store.CSIVolumeRegister(index, []*structs.CSIVolume{vol}) + if err != nil { + return err + } + + return nil +} diff --git a/nomad/volumewatcher/volume_watcher_test.go b/nomad/volumewatcher/volume_watcher_test.go index 848ca58b925..4e8a556a49c 100644 --- a/nomad/volumewatcher/volume_watcher_test.go +++ b/nomad/volumewatcher/volume_watcher_test.go @@ -75,3 +75,32 @@ func TestVolumeWatch_Reap(t *testing.T) { require.NoError(err) require.Len(vol.PastClaims, 2) // alloc claim + GC claim } + +func TestVolumeReapBadState(t *testing.T) { + + store := state.TestStateStore(t) + err := state.TestBadCSIState(t, store) + require.NoError(t, err) + srv := &MockRPCServer{ + state: store, + } + + vol, err := srv.state.CSIVolumeByID(nil, + structs.DefaultNamespace, "csi-volume-nfs0") + require.NoError(t, err) + srv.state.CSIVolumeDenormalize(nil, vol) + + ctx, exitFn := context.WithCancel(context.Background()) + w := &volumeWatcher{ + v: vol, + rpc: srv, + state: srv.State(), + ctx: ctx, + exitFn: exitFn, + logger: testlog.HCLogger(t), + } + + err = w.volumeReapImpl(vol) + require.NoError(t, err) + require.Equal(t, 2, srv.countCSIUnpublish) +} diff --git a/nomad/volumewatcher/volumes_watcher_test.go b/nomad/volumewatcher/volumes_watcher_test.go index 7f0365be351..2271c2f2036 100644 --- a/nomad/volumewatcher/volumes_watcher_test.go +++ b/nomad/volumewatcher/volumes_watcher_test.go @@ -70,10 +70,15 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) { alloc.ClientStatus = structs.AllocClientStatusComplete vol := testVolume(plugin, alloc, node.ID) + index++ + err := srv.State().UpsertAllocs(structs.MsgTypeTestSetup, index, + []*structs.Allocation{alloc}) + require.NoError(err) + watcher.SetEnabled(true, srv.State(), "") index++ - err := srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol}) + err = srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol}) require.NoError(err) // we should get or start up a watcher when we get an update for From e6d7ff4ec1654beb92d047313c1a0536b25f56b0 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Mon, 24 Jan 2022 14:25:41 -0500 Subject: [PATCH 2/2] csi: make feasibility check errors more understandable When the feasibility checker finds we have no free write claims, it checks to see if any of those claims are for the job we're currently scheduling (so that earlier versions of a job can't block claims for new versions) and reports a conflict if the volume can't be scheduled so that the user can fix their claims. But when the checker hits a claim that has a GCd allocation, the state is recoverable by the server once claim reaping completes and no user intervention is required; the blocked eval should complete. Differentiate the scheduler error produced by these two conditions. --- scheduler/feasible.go | 38 ++++++++++++++++++++++++-------------- 1 file changed, 24 insertions(+), 14 deletions(-) diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 20654424965..3b10331c5c8 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -15,17 +15,18 @@ import ( ) const ( - FilterConstraintHostVolumes = "missing compatible host volumes" - FilterConstraintCSIPluginTemplate = "CSI plugin %s is missing from client %s" - FilterConstraintCSIPluginUnhealthyTemplate = "CSI plugin %s is unhealthy on client %s" - FilterConstraintCSIPluginMaxVolumesTemplate = "CSI plugin %s has the maximum number of volumes on client %s" - FilterConstraintCSIVolumesLookupFailed = "CSI volume lookup failed" - FilterConstraintCSIVolumeNotFoundTemplate = "missing CSI Volume %s" - FilterConstraintCSIVolumeNoReadTemplate = "CSI volume %s is unschedulable or has exhausted its available reader claims" - FilterConstraintCSIVolumeNoWriteTemplate = "CSI volume %s is unschedulable or is read-only" - FilterConstraintCSIVolumeInUseTemplate = "CSI volume %s has exhausted its available writer claims" // - FilterConstraintDrivers = "missing drivers" - FilterConstraintDevices = "missing devices" + FilterConstraintHostVolumes = "missing compatible host volumes" + FilterConstraintCSIPluginTemplate = "CSI plugin %s is missing from client %s" + FilterConstraintCSIPluginUnhealthyTemplate = "CSI plugin %s is unhealthy on client %s" + FilterConstraintCSIPluginMaxVolumesTemplate = "CSI plugin %s has the maximum number of volumes on client %s" + FilterConstraintCSIVolumesLookupFailed = "CSI volume lookup failed" + FilterConstraintCSIVolumeNotFoundTemplate = "missing CSI Volume %s" + FilterConstraintCSIVolumeNoReadTemplate = "CSI volume %s is unschedulable or has exhausted its available reader claims" + FilterConstraintCSIVolumeNoWriteTemplate = "CSI volume %s is unschedulable or is read-only" + FilterConstraintCSIVolumeInUseTemplate = "CSI volume %s has exhausted its available writer claims" + FilterConstraintCSIVolumeGCdAllocationTemplate = "CSI volume %s has exhausted its available writer claims and is claimed by a garbage collected allocation %s; waiting for claim to be released" + FilterConstraintDrivers = "missing drivers" + FilterConstraintDevices = "missing devices" ) var ( @@ -320,11 +321,20 @@ func (c *CSIVolumeChecker) isFeasible(n *structs.Node) (bool, string) { return false, fmt.Sprintf(FilterConstraintCSIVolumeNoWriteTemplate, vol.ID) } if !vol.WriteFreeClaims() { - // Check the blocking allocations to see if they belong to this job for id := range vol.WriteAllocs { a, err := c.ctx.State().AllocByID(ws, id) - if err != nil || a == nil || - a.Namespace != c.namespace || a.JobID != c.jobID { + // the alloc for this blocking claim has been + // garbage collected but the volumewatcher hasn't + // finished releasing the claim (and possibly + // detaching the volume), so we need to block + // until it can be scheduled + if err != nil || a == nil { + return false, fmt.Sprintf( + FilterConstraintCSIVolumeGCdAllocationTemplate, vol.ID, id) + } else if a.Namespace != c.namespace || a.JobID != c.jobID { + // the blocking claim is for another live job + // so it's legitimately blocking more write + // claims return false, fmt.Sprintf( FilterConstraintCSIVolumeInUseTemplate, vol.ID) }