diff --git a/.changelog/11890.txt b/.changelog/11890.txt new file mode 100644 index 00000000000..1074aa29cb4 --- /dev/null +++ b/.changelog/11890.txt @@ -0,0 +1,3 @@ +```release-note:bug +csi: Fixed a bug where garbage collected allocations could block new claims on a volume +``` diff --git a/nomad/core_sched.go b/nomad/core_sched.go index cffb6114ba2..4306783efcb 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -782,6 +782,10 @@ NEXT_VOLUME: continue } + // TODO(tgross): consider moving the TerminalStatus check into + // the denormalize volume logic so that we can just check the + // volume for past claims + // we only call the claim release RPC if the volume has claims // that no longer have valid allocations. otherwise we'd send // out a lot of do-nothing RPCs. diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index a19d4395b07..095975a31f6 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -2383,19 +2383,64 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) { c := core.(*CoreScheduler) require.NoError(c.csiVolumeClaimGC(gc)) - // the volumewatcher will hit an error here because there's no - // path to the node. but we can't update the claim to bypass the - // client RPCs without triggering the volumewatcher's normal code - // path. + // TODO(tgross): the condition below means this test doesn't tell + // us much; ideally we should be intercepting the claim request + // and verifying that we send the expected claims but we don't + // have test infra in place to do that for server RPCs + + // sending the GC claim will trigger the volumewatcher's normal + // code path. but the volumewatcher will hit an error here + // because there's no path to the node, so we shouldn't see + // the WriteClaims removed require.Eventually(func() bool { vol, _ := state.CSIVolumeByID(ws, ns, volID) return len(vol.WriteClaims) == 1 && len(vol.WriteAllocs) == 1 && - len(vol.PastClaims) == 0 + len(vol.PastClaims) == 1 }, time.Second*1, 10*time.Millisecond, "claims were released unexpectedly") } +func TestCoreScheduler_CSIBadState_ClaimGC(t *testing.T) { + t.Parallel() + require := require.New(t) + + srv, shutdown := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + + defer shutdown() + testutil.WaitForLeader(t, srv.RPC) + + err := state.TestBadCSIState(t, srv.State()) + require.NoError(err) + + snap, err := srv.State().Snapshot() + require.NoError(err) + core := NewCoreScheduler(srv, snap) + + index, _ := srv.State().LatestIndex() + index++ + gc := srv.coreJobEval(structs.CoreJobForceGC, index) + c := core.(*CoreScheduler) + require.NoError(c.csiVolumeClaimGC(gc)) + + require.Eventually(func() bool { + vol, _ := srv.State().CSIVolumeByID(nil, + structs.DefaultNamespace, "csi-volume-nfs0") + if len(vol.PastClaims) != 2 { + return false + } + for _, claim := range vol.PastClaims { + if claim.State != structs.CSIVolumeClaimStateUnpublishing { + return false + } + } + return true + }, time.Second*1, 10*time.Millisecond, "invalid claims should be marked for GC") + +} + func TestCoreScheduler_FailLoop(t *testing.T) { t.Parallel() require := require.New(t) diff --git a/nomad/state/state_store.go b/nomad/state/state_store.go index 7d8decfdd79..7551ad04535 100644 --- a/nomad/state/state_store.go +++ b/nomad/state/state_store.go @@ -2513,6 +2513,18 @@ func (s *StateStore) CSIVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *st State: structs.CSIVolumeClaimStateTaken, } } + } else if _, ok := vol.PastClaims[id]; !ok { + // ensure that any allocs that have been GC'd since + // our last read are marked as past claims + vol.PastClaims[id] = &structs.CSIVolumeClaim{ + AllocationID: id, + Mode: structs.CSIVolumeClaimRead, + State: structs.CSIVolumeClaimStateUnpublishing, + } + readClaim := vol.ReadClaims[id] + if readClaim != nil { + vol.PastClaims[id].NodeID = readClaim.NodeID + } } } @@ -2531,6 +2543,20 @@ func (s *StateStore) CSIVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *st State: structs.CSIVolumeClaimStateTaken, } } + } else if _, ok := vol.PastClaims[id]; !ok { + // ensure that any allocs that have been GC'd since + // our last read are marked as past claims + + vol.PastClaims[id] = &structs.CSIVolumeClaim{ + AllocationID: id, + Mode: structs.CSIVolumeClaimWrite, + State: structs.CSIVolumeClaimStateUnpublishing, + } + writeClaim := vol.WriteClaims[id] + if writeClaim != nil { + vol.PastClaims[id].NodeID = writeClaim.NodeID + } + } } diff --git a/nomad/state/testing.go b/nomad/state/testing.go index 460df609773..0d570a95ade 100644 --- a/nomad/state/testing.go +++ b/nomad/state/testing.go @@ -1,7 +1,9 @@ package state import ( + "math" "testing" + "time" "github.com/hashicorp/nomad/helper/testlog" "github.com/hashicorp/nomad/helper/uuid" @@ -124,3 +126,188 @@ func createTestCSIPlugin(s *StateStore, id string, requiresController bool) func s.DeleteNode(structs.MsgTypeTestSetup, index, ids) } } + +func TestBadCSIState(t testing.TB, store *StateStore) error { + + pluginID := "org.democratic-csi.nfs" + + controllerInfo := func(isHealthy bool) map[string]*structs.CSIInfo { + desc := "healthy" + if !isHealthy { + desc = "failed fingerprinting with error" + } + return map[string]*structs.CSIInfo{ + pluginID: { + PluginID: pluginID, + AllocID: uuid.Generate(), + Healthy: isHealthy, + HealthDescription: desc, + RequiresControllerPlugin: true, + ControllerInfo: &structs.CSIControllerInfo{ + SupportsReadOnlyAttach: true, + SupportsAttachDetach: true, + }, + }, + } + } + + nodeInfo := func(nodeName string, isHealthy bool) map[string]*structs.CSIInfo { + desc := "healthy" + if !isHealthy { + desc = "failed fingerprinting with error" + } + return map[string]*structs.CSIInfo{ + pluginID: { + PluginID: pluginID, + AllocID: uuid.Generate(), + Healthy: isHealthy, + HealthDescription: desc, + RequiresControllerPlugin: true, + NodeInfo: &structs.CSINodeInfo{ + ID: nodeName, + MaxVolumes: math.MaxInt64, + RequiresNodeStageVolume: true, + }, + }, + } + } + + nodes := make([]*structs.Node, 3) + for i := range nodes { + n := mock.Node() + n.Attributes["nomad.version"] = "1.2.4" + nodes[i] = n + } + + nodes[0].CSIControllerPlugins = controllerInfo(true) + nodes[0].CSINodePlugins = nodeInfo("nomad-client0", true) + + drainID := uuid.Generate() + + // drained node + nodes[1].CSIControllerPlugins = controllerInfo(false) + nodes[1].CSINodePlugins = nodeInfo("nomad-client1", false) + + nodes[1].LastDrain = &structs.DrainMetadata{ + StartedAt: time.Now().Add(-10 * time.Minute), + UpdatedAt: time.Now().Add(-30 * time.Second), + Status: structs.DrainStatusComplete, + AccessorID: drainID, + } + nodes[1].SchedulingEligibility = structs.NodeSchedulingIneligible + + // previously drained but now eligible + nodes[2].CSIControllerPlugins = controllerInfo(true) + nodes[2].CSINodePlugins = nodeInfo("nomad-client2", true) + nodes[2].LastDrain = &structs.DrainMetadata{ + StartedAt: time.Now().Add(-15 * time.Minute), + UpdatedAt: time.Now().Add(-5 * time.Minute), + Status: structs.DrainStatusComplete, + AccessorID: drainID, + } + nodes[2].SchedulingEligibility = structs.NodeSchedulingEligible + + // Insert nodes into the state store + index := uint64(999) + for _, n := range nodes { + index++ + err := store.UpsertNode(structs.MsgTypeTestSetup, index, n) + if err != nil { + return err + } + } + + allocID0 := uuid.Generate() // nil alloc + allocID2 := uuid.Generate() // nil alloc + + alloc1 := mock.Alloc() + alloc1.ClientStatus = "complete" + alloc1.DesiredStatus = "stop" + + // Insert allocs into the state store + err := store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1}) + if err != nil { + return err + } + + vol := &structs.CSIVolume{ + ID: "csi-volume-nfs0", + Name: "csi-volume-nfs0", + ExternalID: "csi-volume-nfs0", + Namespace: "default", + AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + MountOptions: &structs.CSIMountOptions{ + MountFlags: []string{"noatime"}, + }, + Context: map[string]string{ + "node_attach_driver": "nfs", + "provisioner_driver": "nfs-client", + "server": "192.168.56.69", + }, + Capacity: 0, + RequestedCapacityMin: 107374182, + RequestedCapacityMax: 107374182, + RequestedCapabilities: []*structs.CSIVolumeCapability{ + { + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter, + }, + { + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter, + }, + { + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + AccessMode: structs.CSIVolumeAccessModeSingleNodeReader, + }, + }, + WriteAllocs: map[string]*structs.Allocation{ + allocID0: nil, + alloc1.ID: nil, + allocID2: nil, + }, + WriteClaims: map[string]*structs.CSIVolumeClaim{ + allocID0: { + AllocationID: allocID0, + NodeID: nodes[0].ID, + Mode: structs.CSIVolumeClaimWrite, + AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + State: structs.CSIVolumeClaimStateTaken, + }, + alloc1.ID: { + AllocationID: alloc1.ID, + NodeID: nodes[1].ID, + Mode: structs.CSIVolumeClaimWrite, + AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + State: structs.CSIVolumeClaimStateTaken, + }, + allocID2: { + AllocationID: allocID2, + NodeID: nodes[2].ID, + Mode: structs.CSIVolumeClaimWrite, + AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter, + AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, + State: structs.CSIVolumeClaimStateTaken, + }, + }, + Schedulable: true, + PluginID: pluginID, + Provider: pluginID, + ProviderVersion: "1.4.3", + ControllerRequired: true, + ControllersHealthy: 2, + ControllersExpected: 2, + NodesHealthy: 2, + NodesExpected: 0, + } + + err = store.CSIVolumeRegister(index, []*structs.CSIVolume{vol}) + if err != nil { + return err + } + + return nil +} diff --git a/nomad/volumewatcher/volume_watcher_test.go b/nomad/volumewatcher/volume_watcher_test.go index 848ca58b925..4e8a556a49c 100644 --- a/nomad/volumewatcher/volume_watcher_test.go +++ b/nomad/volumewatcher/volume_watcher_test.go @@ -75,3 +75,32 @@ func TestVolumeWatch_Reap(t *testing.T) { require.NoError(err) require.Len(vol.PastClaims, 2) // alloc claim + GC claim } + +func TestVolumeReapBadState(t *testing.T) { + + store := state.TestStateStore(t) + err := state.TestBadCSIState(t, store) + require.NoError(t, err) + srv := &MockRPCServer{ + state: store, + } + + vol, err := srv.state.CSIVolumeByID(nil, + structs.DefaultNamespace, "csi-volume-nfs0") + require.NoError(t, err) + srv.state.CSIVolumeDenormalize(nil, vol) + + ctx, exitFn := context.WithCancel(context.Background()) + w := &volumeWatcher{ + v: vol, + rpc: srv, + state: srv.State(), + ctx: ctx, + exitFn: exitFn, + logger: testlog.HCLogger(t), + } + + err = w.volumeReapImpl(vol) + require.NoError(t, err) + require.Equal(t, 2, srv.countCSIUnpublish) +} diff --git a/nomad/volumewatcher/volumes_watcher_test.go b/nomad/volumewatcher/volumes_watcher_test.go index 7f0365be351..2271c2f2036 100644 --- a/nomad/volumewatcher/volumes_watcher_test.go +++ b/nomad/volumewatcher/volumes_watcher_test.go @@ -70,10 +70,15 @@ func TestVolumeWatch_LeadershipTransition(t *testing.T) { alloc.ClientStatus = structs.AllocClientStatusComplete vol := testVolume(plugin, alloc, node.ID) + index++ + err := srv.State().UpsertAllocs(structs.MsgTypeTestSetup, index, + []*structs.Allocation{alloc}) + require.NoError(err) + watcher.SetEnabled(true, srv.State(), "") index++ - err := srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol}) + err = srv.State().CSIVolumeRegister(index, []*structs.CSIVolume{vol}) require.NoError(err) // we should get or start up a watcher when we get an update for diff --git a/scheduler/feasible.go b/scheduler/feasible.go index 20654424965..3b10331c5c8 100644 --- a/scheduler/feasible.go +++ b/scheduler/feasible.go @@ -15,17 +15,18 @@ import ( ) const ( - FilterConstraintHostVolumes = "missing compatible host volumes" - FilterConstraintCSIPluginTemplate = "CSI plugin %s is missing from client %s" - FilterConstraintCSIPluginUnhealthyTemplate = "CSI plugin %s is unhealthy on client %s" - FilterConstraintCSIPluginMaxVolumesTemplate = "CSI plugin %s has the maximum number of volumes on client %s" - FilterConstraintCSIVolumesLookupFailed = "CSI volume lookup failed" - FilterConstraintCSIVolumeNotFoundTemplate = "missing CSI Volume %s" - FilterConstraintCSIVolumeNoReadTemplate = "CSI volume %s is unschedulable or has exhausted its available reader claims" - FilterConstraintCSIVolumeNoWriteTemplate = "CSI volume %s is unschedulable or is read-only" - FilterConstraintCSIVolumeInUseTemplate = "CSI volume %s has exhausted its available writer claims" // - FilterConstraintDrivers = "missing drivers" - FilterConstraintDevices = "missing devices" + FilterConstraintHostVolumes = "missing compatible host volumes" + FilterConstraintCSIPluginTemplate = "CSI plugin %s is missing from client %s" + FilterConstraintCSIPluginUnhealthyTemplate = "CSI plugin %s is unhealthy on client %s" + FilterConstraintCSIPluginMaxVolumesTemplate = "CSI plugin %s has the maximum number of volumes on client %s" + FilterConstraintCSIVolumesLookupFailed = "CSI volume lookup failed" + FilterConstraintCSIVolumeNotFoundTemplate = "missing CSI Volume %s" + FilterConstraintCSIVolumeNoReadTemplate = "CSI volume %s is unschedulable or has exhausted its available reader claims" + FilterConstraintCSIVolumeNoWriteTemplate = "CSI volume %s is unschedulable or is read-only" + FilterConstraintCSIVolumeInUseTemplate = "CSI volume %s has exhausted its available writer claims" + FilterConstraintCSIVolumeGCdAllocationTemplate = "CSI volume %s has exhausted its available writer claims and is claimed by a garbage collected allocation %s; waiting for claim to be released" + FilterConstraintDrivers = "missing drivers" + FilterConstraintDevices = "missing devices" ) var ( @@ -320,11 +321,20 @@ func (c *CSIVolumeChecker) isFeasible(n *structs.Node) (bool, string) { return false, fmt.Sprintf(FilterConstraintCSIVolumeNoWriteTemplate, vol.ID) } if !vol.WriteFreeClaims() { - // Check the blocking allocations to see if they belong to this job for id := range vol.WriteAllocs { a, err := c.ctx.State().AllocByID(ws, id) - if err != nil || a == nil || - a.Namespace != c.namespace || a.JobID != c.jobID { + // the alloc for this blocking claim has been + // garbage collected but the volumewatcher hasn't + // finished releasing the claim (and possibly + // detaching the volume), so we need to block + // until it can be scheduled + if err != nil || a == nil { + return false, fmt.Sprintf( + FilterConstraintCSIVolumeGCdAllocationTemplate, vol.ID, id) + } else if a.Namespace != c.namespace || a.JobID != c.jobID { + // the blocking claim is for another live job + // so it's legitimately blocking more write + // claims return false, fmt.Sprintf( FilterConstraintCSIVolumeInUseTemplate, vol.ID) }