Skip to content

Commit

Permalink
csi: resolve invalid claim states on read
Browse files Browse the repository at this point in the history
It's currently possible for CSI volumes to be claimed by allocations
that no longer exist. This changeset asserts a reasonable state at
the state store level by registering these nil allocations as "past
claims" on any read. This will cause any pass through the periodic GC
or volumewatcher to trigger the unpublishing workflow for those claims.
  • Loading branch information
tgross committed Jan 20, 2022
1 parent 1471de4 commit ea44d74
Show file tree
Hide file tree
Showing 8 changed files with 309 additions and 6 deletions.
3 changes: 3 additions & 0 deletions .changelog/11890.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```bug
csi: Fixed a bug where garbage collected allocations could block new claims on a volume
```
3 changes: 3 additions & 0 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -782,6 +782,9 @@ NEXT_VOLUME:
continue
}

// TODO: move this logic into the denormalize logic so that a
// volume can just be checked for past claims!

// we only call the claim release RPC if the volume has claims
// that no longer have valid allocations. otherwise we'd send
// out a lot of do-nothing RPCs.
Expand Down
54 changes: 49 additions & 5 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2383,19 +2383,63 @@ func TestCoreScheduler_CSIVolumeClaimGC(t *testing.T) {
c := core.(*CoreScheduler)
require.NoError(c.csiVolumeClaimGC(gc))

// the volumewatcher will hit an error here because there's no
// path to the node. but we can't update the claim to bypass the
// client RPCs without triggering the volumewatcher's normal code
// path.
// TODO: the condition below means this test doesn't tell us
// much; we should be intercepting the claim request and verifying
// that we send the expected claims

// sending the GC claim will trigger the volumewatcher's normal
// code path. but the volumewatcher will hit an error here
// because there's no path to the node, so we shouldn't see
// the WriteClaims removed
require.Eventually(func() bool {
vol, _ := state.CSIVolumeByID(ws, ns, volID)
return len(vol.WriteClaims) == 1 &&
len(vol.WriteAllocs) == 1 &&
len(vol.PastClaims) == 0
len(vol.PastClaims) == 1
}, time.Second*1, 10*time.Millisecond, "claims were released unexpectedly")

}

func TestCoreScheduler_CSIBadState_ClaimGC(t *testing.T) {
t.Parallel()
require := require.New(t)

srv, shutdown := TestServer(t, func(c *Config) {
c.NumSchedulers = 0 // Prevent automatic dequeue
})

defer shutdown()
testutil.WaitForLeader(t, srv.RPC)

err := state.TestBadCSIState(t, srv.State())
require.NoError(err)

snap, err := srv.State().Snapshot()
require.NoError(err)
core := NewCoreScheduler(srv, snap)

index, _ := srv.State().LatestIndex()
index++
gc := srv.coreJobEval(structs.CoreJobForceGC, index)
c := core.(*CoreScheduler)
require.NoError(c.csiVolumeClaimGC(gc))

require.Eventually(func() bool {
vol, _ := srv.State().CSIVolumeByID(nil,
structs.DefaultNamespace, "csi-volume-nfs0")
if len(vol.PastClaims) != 2 {
return false
}
for _, claim := range vol.PastClaims {
if claim.State != structs.CSIVolumeClaimStateUnpublishing {
return false
}
}
return true
}, time.Second*1, 10*time.Millisecond, "invalid claims should be marked for GC")

}

func TestCoreScheduler_FailLoop(t *testing.T) {
t.Parallel()
require := require.New(t)
Expand Down
5 changes: 5 additions & 0 deletions nomad/csi_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ func (v *CSIVolume) Unpublish(args *structs.CSIVolumeUnpublishRequest, reply *st
}

NODE_DETACHED:
// TODO: shouldn't we only controller unpublish if there weren't any node claims left?
err = v.controllerUnpublishVolume(vol, claim)
if err != nil {
return err
Expand Down Expand Up @@ -620,6 +621,8 @@ func (v *CSIVolume) nodeUnpublishVolume(vol *structs.CSIVolume, claim *structs.C
// alloc on the node
allocIDs := []string{}
state := v.srv.fsm.State()
// TODO: we should never call this from here; it should be private
// to state_store and get called on any query instead
vol, err := state.CSIVolumeDenormalize(memdb.NewWatchSet(), vol)
if err != nil {
return err
Expand Down Expand Up @@ -708,6 +711,8 @@ func (v *CSIVolume) controllerUnpublishVolume(vol *structs.CSIVolume, claim *str
// we only send a controller detach if a Nomad client no longer has
// any claim to the volume, so we need to check the status of claimed
// allocations
// TODO: we shouldn't ever call this here; move it into the state_store
// and call it internally on every query
vol, err = state.CSIVolumeDenormalize(ws, vol)
if err != nil {
return err
Expand Down
27 changes: 27 additions & 0 deletions nomad/state/state_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -853,6 +853,7 @@ func upsertNodeTxn(txn *txn, index uint64, node *structs.Node) error {
if err := txn.Insert("index", &IndexEntry{"nodes", index}); err != nil {
return fmt.Errorf("index update failed: %v", err)
}

if err := upsertNodeCSIPlugins(txn, node, index); err != nil {
return fmt.Errorf("csi plugin update failed: %v", err)
}
Expand Down Expand Up @@ -2513,6 +2514,18 @@ func (s *StateStore) CSIVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *st
State: structs.CSIVolumeClaimStateTaken,
}
}
} else if _, ok := vol.PastClaims[id]; !ok {
// ensure that any allocs that have been GC'd since
// our last read are marked as past claims
vol.PastClaims[id] = &structs.CSIVolumeClaim{
AllocationID: id,
Mode: structs.CSIVolumeClaimRead,
State: structs.CSIVolumeClaimStateUnpublishing,
}
readClaim := vol.ReadClaims[id]
if readClaim != nil {
vol.PastClaims[id].NodeID = readClaim.NodeID
}
}
}

Expand All @@ -2531,6 +2544,20 @@ func (s *StateStore) CSIVolumeDenormalizeTxn(txn Txn, ws memdb.WatchSet, vol *st
State: structs.CSIVolumeClaimStateTaken,
}
}
} else if _, ok := vol.PastClaims[id]; !ok {
// ensure that any allocs that have been GC'd since
// our last read are marked as past claims

vol.PastClaims[id] = &structs.CSIVolumeClaim{
AllocationID: id,
Mode: structs.CSIVolumeClaimWrite,
State: structs.CSIVolumeClaimStateUnpublishing,
}
writeClaim := vol.WriteClaims[id]
if writeClaim != nil {
vol.PastClaims[id].NodeID = writeClaim.NodeID
}

}
}

Expand Down
187 changes: 187 additions & 0 deletions nomad/state/testing.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package state

import (
"math"
"testing"
"time"

"github.com/hashicorp/nomad/helper/testlog"
"github.com/hashicorp/nomad/helper/uuid"
Expand Down Expand Up @@ -124,3 +126,188 @@ func createTestCSIPlugin(s *StateStore, id string, requiresController bool) func
s.DeleteNode(structs.MsgTypeTestSetup, index, ids)
}
}

func TestBadCSIState(t testing.TB, store *StateStore) error {

pluginID := "org.democratic-csi.nfs"

controllerInfo := func(isHealthy bool) map[string]*structs.CSIInfo {
desc := "healthy"
if !isHealthy {
desc = "failed fingerprinting with error"
}
return map[string]*structs.CSIInfo{
pluginID: {
PluginID: pluginID,
AllocID: uuid.Generate(),
Healthy: isHealthy,
HealthDescription: desc,
RequiresControllerPlugin: true,
ControllerInfo: &structs.CSIControllerInfo{
SupportsReadOnlyAttach: true,
SupportsAttachDetach: true,
},
},
}
}

nodeInfo := func(nodeName string, isHealthy bool) map[string]*structs.CSIInfo {
desc := "healthy"
if !isHealthy {
desc = "failed fingerprinting with error"
}
return map[string]*structs.CSIInfo{
pluginID: {
PluginID: pluginID,
AllocID: uuid.Generate(),
Healthy: isHealthy,
HealthDescription: desc,
RequiresControllerPlugin: true,
NodeInfo: &structs.CSINodeInfo{
ID: nodeName,
MaxVolumes: math.MaxInt64,
RequiresNodeStageVolume: true,
},
},
}
}

nodes := make([]*structs.Node, 3)
for i := range nodes {
n := mock.Node()
n.Attributes["nomad.version"] = "1.2.4"
nodes[i] = n
}

nodes[0].CSIControllerPlugins = controllerInfo(true)
nodes[0].CSINodePlugins = nodeInfo("nomad-client0", true)

drainID := uuid.Generate()

// drained node
nodes[1].CSIControllerPlugins = controllerInfo(false)
nodes[1].CSINodePlugins = nodeInfo("nomad-client1", false)

nodes[1].LastDrain = &structs.DrainMetadata{
StartedAt: time.Now().Add(-10 * time.Minute),
UpdatedAt: time.Now().Add(-30 * time.Second),
Status: structs.DrainStatusComplete,
AccessorID: drainID,
}
nodes[1].SchedulingEligibility = structs.NodeSchedulingIneligible

// previously drained but now eligible
nodes[2].CSIControllerPlugins = controllerInfo(true)
nodes[2].CSINodePlugins = nodeInfo("nomad-client2", true)
nodes[2].LastDrain = &structs.DrainMetadata{
StartedAt: time.Now().Add(-15 * time.Minute),
UpdatedAt: time.Now().Add(-5 * time.Minute),
Status: structs.DrainStatusComplete,
AccessorID: drainID,
}
nodes[2].SchedulingEligibility = structs.NodeSchedulingEligible

// Insert nodes into the state store
index := uint64(999)
for _, n := range nodes {
index++
err := store.UpsertNode(structs.MsgTypeTestSetup, index, n)
if err != nil {
return err
}
}

allocID0 := uuid.Generate() // nil alloc
allocID2 := uuid.Generate() // nil alloc

alloc1 := mock.Alloc()
alloc1.ClientStatus = "complete"
alloc1.DesiredStatus = "stop"

// Insert allocs into the state store
err := store.UpsertAllocs(structs.MsgTypeTestSetup, index, []*structs.Allocation{alloc1})
if err != nil {
return err
}

vol := &structs.CSIVolume{
ID: "csi-volume-nfs0",
Name: "csi-volume-nfs0",
ExternalID: "csi-volume-nfs0",
Namespace: "default",
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
MountOptions: &structs.CSIMountOptions{
MountFlags: []string{"noatime"},
},
Context: map[string]string{
"node_attach_driver": "nfs",
"provisioner_driver": "nfs-client",
"server": "192.168.56.69",
},
Capacity: 0,
RequestedCapacityMin: 107374182,
RequestedCapacityMax: 107374182,
RequestedCapabilities: []*structs.CSIVolumeCapability{
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
},
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
},
{
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
AccessMode: structs.CSIVolumeAccessModeSingleNodeReader,
},
},
WriteAllocs: map[string]*structs.Allocation{
allocID0: nil,
alloc1.ID: nil,
allocID2: nil,
},
WriteClaims: map[string]*structs.CSIVolumeClaim{
allocID0: {
AllocationID: allocID0,
NodeID: nodes[0].ID,
Mode: structs.CSIVolumeClaimWrite,
AccessMode: structs.CSIVolumeAccessModeSingleNodeWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
State: structs.CSIVolumeClaimStateTaken,
},
alloc1.ID: {
AllocationID: alloc1.ID,
NodeID: nodes[1].ID,
Mode: structs.CSIVolumeClaimWrite,
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
State: structs.CSIVolumeClaimStateTaken,
},
allocID2: {
AllocationID: allocID2,
NodeID: nodes[2].ID,
Mode: structs.CSIVolumeClaimWrite,
AccessMode: structs.CSIVolumeAccessModeMultiNodeMultiWriter,
AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem,
State: structs.CSIVolumeClaimStateTaken,
},
},
Schedulable: true,
PluginID: pluginID,
Provider: pluginID,
ProviderVersion: "1.4.3",
ControllerRequired: true,
ControllersHealthy: 2,
ControllersExpected: 2,
NodesHealthy: 2,
NodesExpected: 0,
}

err = store.CSIVolumeRegister(index, []*structs.CSIVolume{vol})
if err != nil {
return err
}

return nil
}
29 changes: 29 additions & 0 deletions nomad/volumewatcher/volume_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,32 @@ func TestVolumeWatch_Reap(t *testing.T) {
require.NoError(err)
require.Len(vol.PastClaims, 2) // alloc claim + GC claim
}

func TestVolumeReapBadState(t *testing.T) {

store := state.TestStateStore(t)
err := state.TestBadCSIState(t, store)
require.NoError(t, err)
srv := &MockRPCServer{
state: store,
}

vol, err := srv.state.CSIVolumeByID(nil,
structs.DefaultNamespace, "csi-volume-nfs0")
require.NoError(t, err)
srv.state.CSIVolumeDenormalize(nil, vol)

ctx, exitFn := context.WithCancel(context.Background())
w := &volumeWatcher{
v: vol,
rpc: srv,
state: srv.State(),
ctx: ctx,
exitFn: exitFn,
logger: testlog.HCLogger(t),
}

err = w.volumeReapImpl(vol)
require.NoError(t, err)
require.Equal(t, 2, srv.countCSIUnpublish)
}
Loading

0 comments on commit ea44d74

Please sign in to comment.