From 588acb55128b6f08db25fdf1016044f1e3e3b9b1 Mon Sep 17 00:00:00 2001 From: Tim Gross Date: Tue, 28 Apr 2020 15:53:30 -0400 Subject: [PATCH] csi: remove old volume claim GC mechanism The volume claim GC mechanism now makes an empty claim RPC for the volume to trigger an index bump. That in turn unblocks the blocking query in the volume watcher so it can assess which claims can be released for a volume. --- nomad/core_sched.go | 212 ++-------------------------- nomad/core_sched_test.go | 267 ------------------------------------ nomad/interfaces.go | 11 -- nomad/job_endpoint.go | 8 +- nomad/node_endpoint.go | 2 +- nomad/node_endpoint_test.go | 2 +- 6 files changed, 17 insertions(+), 485 deletions(-) delete mode 100644 nomad/interfaces.go diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 0b1d7e62420..08f65e5bf82 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -8,9 +8,7 @@ import ( log "github.com/hashicorp/go-hclog" memdb "github.com/hashicorp/go-memdb" - multierror "github.com/hashicorp/go-multierror" version "github.com/hashicorp/go-version" - cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/scheduler" @@ -711,212 +709,30 @@ func allocGCEligible(a *structs.Allocation, job *structs.Job, gcTime time.Time, return timeDiff > interval.Nanoseconds() } +// TODO: we need a periodic trigger to iterate over all the volumes and split +// them up into separate work items, same as we do for jobs. + // csiVolumeClaimGC is used to garbage collect CSI volume claims func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error { - c.logger.Trace("garbage collecting unclaimed CSI volume claims") + c.logger.Trace("garbage collecting unclaimed CSI volume claims", "eval.JobID", eval.JobID) // Volume ID smuggled in with the eval's own JobID evalVolID := strings.Split(eval.JobID, ":") - if len(evalVolID) != 3 { + + // COMPAT(1.0): 0.11.0 shipped with 3 fields. tighten this check to len == 2 + if len(evalVolID) < 2 { c.logger.Error("volume gc called without volID") return nil } volID := evalVolID[1] - runningAllocs := evalVolID[2] == "purge" - return volumeClaimReap(c.srv, volID, eval.Namespace, - c.srv.config.Region, eval.LeaderACL, runningAllocs) -} - -func volumeClaimReap(srv RPCServer, volID, namespace, region, leaderACL string, runningAllocs bool) error { - - ws := memdb.NewWatchSet() - - vol, err := srv.State().CSIVolumeByID(ws, namespace, volID) - if err != nil { - return err - } - if vol == nil { - return nil - } - vol, err = srv.State().CSIVolumeDenormalize(ws, vol) - if err != nil { - return err - } - - plug, err := srv.State().CSIPluginByID(ws, vol.PluginID) - if err != nil { - return err - } - - nodeClaims := collectClaimsToGCImpl(vol, runningAllocs) - - var result *multierror.Error - for _, claim := range vol.PastClaims { - nodeClaims, err = volumeClaimReapImpl(srv, - &volumeClaimReapArgs{ - vol: vol, - plug: plug, - claim: claim, - namespace: namespace, - region: region, - leaderACL: leaderACL, - nodeClaims: nodeClaims, - }, - ) - if err != nil { - result = multierror.Append(result, err) - continue - } + req := &structs.CSIVolumeClaimRequest{ + VolumeID: volID, + Claim: structs.CSIVolumeClaimRelease, } - return result.ErrorOrNil() - -} + req.Namespace = eval.Namespace + req.Region = c.srv.config.Region -func collectClaimsToGCImpl(vol *structs.CSIVolume, runningAllocs bool) map[string]int { - nodeClaims := map[string]int{} // node IDs -> count - - collectFunc := func(allocs map[string]*structs.Allocation, - claims map[string]*structs.CSIVolumeClaim) { - - for allocID, alloc := range allocs { - claim, ok := claims[allocID] - if !ok { - // COMPAT(1.0): the CSIVolumeClaim fields were added - // after 0.11.1, so claims made before that may be - // missing this value. note that we'll have non-nil - // allocs here because we called denormalize on the - // value. - claim = &structs.CSIVolumeClaim{ - AllocationID: allocID, - NodeID: alloc.NodeID, - State: structs.CSIVolumeClaimStateTaken, - } - } - nodeClaims[claim.NodeID]++ - if runningAllocs || alloc.Terminated() { - // only overwrite the PastClaim if this is new, - // so that we can track state between subsequent calls - if _, exists := vol.PastClaims[claim.AllocationID]; !exists { - claim.State = structs.CSIVolumeClaimStateTaken - vol.PastClaims[claim.AllocationID] = claim - } - } - } - } - - collectFunc(vol.WriteAllocs, vol.WriteClaims) - collectFunc(vol.ReadAllocs, vol.ReadClaims) - return nodeClaims -} - -type volumeClaimReapArgs struct { - vol *structs.CSIVolume - plug *structs.CSIPlugin - claim *structs.CSIVolumeClaim - region string - namespace string - leaderACL string - nodeClaims map[string]int // node IDs -> count -} - -func volumeClaimReapImpl(srv RPCServer, args *volumeClaimReapArgs) (map[string]int, error) { - vol := args.vol - claim := args.claim - - var err error - var nReq *cstructs.ClientCSINodeDetachVolumeRequest - - checkpoint := func(claimState structs.CSIVolumeClaimState) error { - req := &structs.CSIVolumeClaimRequest{ - VolumeID: vol.ID, - AllocationID: claim.AllocationID, - Claim: structs.CSIVolumeClaimRelease, - WriteRequest: structs.WriteRequest{ - Region: args.region, - Namespace: args.namespace, - AuthToken: args.leaderACL, - }, - } - return srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{}) - } - - // previous checkpoints may have set the past claim state already. - // in practice we should never see CSIVolumeClaimStateControllerDetached - // but having an option for the state makes it easy to add a checkpoint - // in a backwards compatible way if we need one later - switch claim.State { - case structs.CSIVolumeClaimStateNodeDetached: - goto NODE_DETACHED - case structs.CSIVolumeClaimStateControllerDetached: - goto RELEASE_CLAIM - case structs.CSIVolumeClaimStateReadyToFree: - goto RELEASE_CLAIM - } - - // (1) NodePublish / NodeUnstage must be completed before controller - // operations or releasing the claim. - nReq = &cstructs.ClientCSINodeDetachVolumeRequest{ - PluginID: args.plug.ID, - VolumeID: vol.ID, - ExternalID: vol.RemoteID(), - AllocID: claim.AllocationID, - NodeID: claim.NodeID, - AttachmentMode: vol.AttachmentMode, - AccessMode: vol.AccessMode, - ReadOnly: claim.Mode == structs.CSIVolumeClaimRead, - } - err = srv.RPC("ClientCSI.NodeDetachVolume", nReq, - &cstructs.ClientCSINodeDetachVolumeResponse{}) - if err != nil { - return args.nodeClaims, err - } - err = checkpoint(structs.CSIVolumeClaimStateNodeDetached) - if err != nil { - return args.nodeClaims, err - } - -NODE_DETACHED: - args.nodeClaims[claim.NodeID]-- - - // (2) we only emit the controller unpublish if no other allocs - // on the node need it, but we also only want to make this - // call at most once per node - if vol.ControllerRequired && args.nodeClaims[claim.NodeID] < 1 { - - // we need to get the CSI Node ID, which is not the same as - // the Nomad Node ID - ws := memdb.NewWatchSet() - targetNode, err := srv.State().NodeByID(ws, claim.NodeID) - if err != nil { - return args.nodeClaims, err - } - if targetNode == nil { - return args.nodeClaims, fmt.Errorf("%s: %s", - structs.ErrUnknownNodePrefix, claim.NodeID) - } - targetCSIInfo, ok := targetNode.CSINodePlugins[args.plug.ID] - if !ok { - return args.nodeClaims, fmt.Errorf("Failed to find NodeInfo for node: %s", targetNode.ID) - } - - cReq := &cstructs.ClientCSIControllerDetachVolumeRequest{ - VolumeID: vol.RemoteID(), - ClientCSINodeID: targetCSIInfo.NodeInfo.ID, - } - cReq.PluginID = args.plug.ID - err = srv.RPC("ClientCSI.ControllerDetachVolume", cReq, - &cstructs.ClientCSIControllerDetachVolumeResponse{}) - if err != nil { - return args.nodeClaims, err - } - } - -RELEASE_CLAIM: - // (3) release the claim from the state store, allowing it to be rescheduled - err = checkpoint(structs.CSIVolumeClaimStateReadyToFree) - if err != nil { - return args.nodeClaims, err - } - return args.nodeClaims, nil + err := c.srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{}) + return err } diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index 819e0908ddb..70b500a82bc 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -6,10 +6,8 @@ import ( "time" memdb "github.com/hashicorp/go-memdb" - cstructs "github.com/hashicorp/nomad/client/structs" "github.com/hashicorp/nomad/helper/uuid" "github.com/hashicorp/nomad/nomad/mock" - "github.com/hashicorp/nomad/nomad/state" "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/stretchr/testify/assert" @@ -2195,268 +2193,3 @@ func TestAllocation_GCEligible(t *testing.T) { alloc.ClientStatus = structs.AllocClientStatusComplete require.True(allocGCEligible(alloc, nil, time.Now(), 1000)) } - -func TestCSI_GCVolumeClaims_Collection(t *testing.T) { - t.Parallel() - srv, shutdownSrv := TestServer(t, func(c *Config) { c.NumSchedulers = 0 }) - defer shutdownSrv() - testutil.WaitForLeader(t, srv.RPC) - - state := srv.fsm.State() - ws := memdb.NewWatchSet() - index := uint64(100) - - // Create a client node, plugin, and volume - node := mock.Node() - node.Attributes["nomad.version"] = "0.11.0" // client RPCs not supported on early version - node.CSINodePlugins = map[string]*structs.CSIInfo{ - "csi-plugin-example": { - PluginID: "csi-plugin-example", - Healthy: true, - RequiresControllerPlugin: true, - NodeInfo: &structs.CSINodeInfo{}, - }, - } - node.CSIControllerPlugins = map[string]*structs.CSIInfo{ - "csi-plugin-example": { - PluginID: "csi-plugin-example", - Healthy: true, - RequiresControllerPlugin: true, - ControllerInfo: &structs.CSIControllerInfo{ - SupportsReadOnlyAttach: true, - SupportsAttachDetach: true, - SupportsListVolumes: true, - SupportsListVolumesAttachedNodes: false, - }, - }, - } - err := state.UpsertNode(99, node) - require.NoError(t, err) - volId0 := uuid.Generate() - ns := structs.DefaultNamespace - vols := []*structs.CSIVolume{{ - ID: volId0, - Namespace: ns, - PluginID: "csi-plugin-example", - AccessMode: structs.CSIVolumeAccessModeMultiNodeSingleWriter, - AttachmentMode: structs.CSIVolumeAttachmentModeFilesystem, - }} - - err = state.CSIVolumeRegister(index, vols) - index++ - require.NoError(t, err) - vol, err := state.CSIVolumeByID(ws, ns, volId0) - - require.NoError(t, err) - require.True(t, vol.ControllerRequired) - require.Len(t, vol.ReadAllocs, 0) - require.Len(t, vol.WriteAllocs, 0) - - // Create a job with 2 allocations - job := mock.Job() - job.TaskGroups[0].Volumes = map[string]*structs.VolumeRequest{ - "_": { - Name: "someVolume", - Type: structs.VolumeTypeCSI, - Source: volId0, - ReadOnly: false, - }, - } - err = state.UpsertJob(index, job) - index++ - require.NoError(t, err) - - alloc1 := mock.Alloc() - alloc1.JobID = job.ID - alloc1.NodeID = node.ID - err = state.UpsertJobSummary(index, mock.JobSummary(alloc1.JobID)) - index++ - require.NoError(t, err) - alloc1.TaskGroup = job.TaskGroups[0].Name - - alloc2 := mock.Alloc() - alloc2.JobID = job.ID - alloc2.NodeID = node.ID - err = state.UpsertJobSummary(index, mock.JobSummary(alloc2.JobID)) - index++ - require.NoError(t, err) - alloc2.TaskGroup = job.TaskGroups[0].Name - - err = state.UpsertAllocs(104, []*structs.Allocation{alloc1, alloc2}) - require.NoError(t, err) - - // Claim the volumes and verify the claims were set - err = state.CSIVolumeClaim(index, ns, volId0, &structs.CSIVolumeClaim{ - AllocationID: alloc1.ID, - NodeID: alloc1.NodeID, - Mode: structs.CSIVolumeClaimWrite, - }) - index++ - require.NoError(t, err) - - err = state.CSIVolumeClaim(index, ns, volId0, &structs.CSIVolumeClaim{ - AllocationID: alloc2.ID, - NodeID: alloc2.NodeID, - Mode: structs.CSIVolumeClaimRead, - }) - index++ - require.NoError(t, err) - - vol, err = state.CSIVolumeByID(ws, ns, volId0) - require.NoError(t, err) - require.Len(t, vol.ReadAllocs, 1) - require.Len(t, vol.WriteAllocs, 1) - - // Update both allocs as failed/terminated - alloc1.ClientStatus = structs.AllocClientStatusFailed - alloc2.ClientStatus = structs.AllocClientStatusFailed - err = state.UpdateAllocsFromClient(index, []*structs.Allocation{alloc1, alloc2}) - require.NoError(t, err) - - vol, err = state.CSIVolumeDenormalize(ws, vol) - require.NoError(t, err) - - nodeClaims := collectClaimsToGCImpl(vol, false) - require.Equal(t, nodeClaims[node.ID], 2) - require.Len(t, vol.PastClaims, 2) -} - -func TestCSI_GCVolumeClaims_Reap(t *testing.T) { - t.Parallel() - require := require.New(t) - - s, shutdownSrv := TestServer(t, func(c *Config) { c.NumSchedulers = 0 }) - defer shutdownSrv() - testutil.WaitForLeader(t, s.RPC) - - node := mock.Node() - plugin := mock.CSIPlugin() - vol := mock.CSIVolume(plugin) - alloc := mock.Alloc() - - cases := []struct { - Name string - ClaimsCount map[string]int - ControllerRequired bool - ExpectedErr string - ExpectedCount int - ExpectedClaimsCount int - ExpectedNodeDetachVolumeCount int - ExpectedControllerDetachVolumeCount int - ExpectedVolumeClaimCount int - srv *MockRPCServer - }{ - { - Name: "NodeDetachVolume fails", - ClaimsCount: map[string]int{node.ID: 1}, - ControllerRequired: true, - ExpectedErr: "node plugin missing", - ExpectedClaimsCount: 1, - ExpectedNodeDetachVolumeCount: 1, - srv: &MockRPCServer{ - state: s.State(), - nextCSINodeDetachVolumeError: fmt.Errorf("node plugin missing"), - }, - }, - { - Name: "ControllerDetachVolume no controllers", - ClaimsCount: map[string]int{node.ID: 1}, - ControllerRequired: true, - ExpectedErr: fmt.Sprintf("Unknown node: %s", node.ID), - ExpectedClaimsCount: 0, - ExpectedNodeDetachVolumeCount: 1, - ExpectedControllerDetachVolumeCount: 0, - ExpectedVolumeClaimCount: 1, - srv: &MockRPCServer{ - state: s.State(), - }, - }, - { - Name: "ControllerDetachVolume node-only", - ClaimsCount: map[string]int{node.ID: 1}, - ControllerRequired: false, - ExpectedClaimsCount: 0, - ExpectedNodeDetachVolumeCount: 1, - ExpectedControllerDetachVolumeCount: 0, - ExpectedVolumeClaimCount: 2, - srv: &MockRPCServer{ - state: s.State(), - }, - }, - } - - for _, tc := range cases { - t.Run(tc.Name, func(t *testing.T) { - vol.ControllerRequired = tc.ControllerRequired - claim := &structs.CSIVolumeClaim{ - AllocationID: alloc.ID, - NodeID: node.ID, - State: structs.CSIVolumeClaimStateTaken, - Mode: structs.CSIVolumeClaimRead, - } - nodeClaims, err := volumeClaimReapImpl(tc.srv, &volumeClaimReapArgs{ - vol: vol, - plug: plugin, - claim: claim, - region: "global", - namespace: "default", - leaderACL: "not-in-use", - nodeClaims: tc.ClaimsCount, - }) - if tc.ExpectedErr != "" { - require.EqualError(err, tc.ExpectedErr) - } else { - require.NoError(err) - } - require.Equal(tc.ExpectedClaimsCount, - nodeClaims[claim.NodeID], "expected claims remaining") - require.Equal(tc.ExpectedNodeDetachVolumeCount, - tc.srv.countCSINodeDetachVolume, "node detach RPC count") - require.Equal(tc.ExpectedControllerDetachVolumeCount, - tc.srv.countCSIControllerDetachVolume, "controller detach RPC count") - require.Equal(tc.ExpectedVolumeClaimCount, - tc.srv.countCSIVolumeClaim, "volume claim RPC count") - }) - } -} - -type MockRPCServer struct { - state *state.StateStore - - // mock responses for ClientCSI.NodeDetachVolume - nextCSINodeDetachVolumeResponse *cstructs.ClientCSINodeDetachVolumeResponse - nextCSINodeDetachVolumeError error - countCSINodeDetachVolume int - - // mock responses for ClientCSI.ControllerDetachVolume - nextCSIControllerDetachVolumeResponse *cstructs.ClientCSIControllerDetachVolumeResponse - nextCSIControllerDetachVolumeError error - countCSIControllerDetachVolume int - - // mock responses for CSI.VolumeClaim - nextCSIVolumeClaimResponse *structs.CSIVolumeClaimResponse - nextCSIVolumeClaimError error - countCSIVolumeClaim int -} - -func (srv *MockRPCServer) RPC(method string, args interface{}, reply interface{}) error { - switch method { - case "ClientCSI.NodeDetachVolume": - reply = srv.nextCSINodeDetachVolumeResponse - srv.countCSINodeDetachVolume++ - return srv.nextCSINodeDetachVolumeError - case "ClientCSI.ControllerDetachVolume": - reply = srv.nextCSIControllerDetachVolumeResponse - srv.countCSIControllerDetachVolume++ - return srv.nextCSIControllerDetachVolumeError - case "CSIVolume.Claim": - reply = srv.nextCSIVolumeClaimResponse - srv.countCSIVolumeClaim++ - return srv.nextCSIVolumeClaimError - default: - return fmt.Errorf("unexpected method %q passed to mock", method) - } - -} - -func (srv *MockRPCServer) State() *state.StateStore { return srv.state } diff --git a/nomad/interfaces.go b/nomad/interfaces.go deleted file mode 100644 index 4dc266d8b80..00000000000 --- a/nomad/interfaces.go +++ /dev/null @@ -1,11 +0,0 @@ -package nomad - -import "github.com/hashicorp/nomad/nomad/state" - -// RPCServer is a minimal interface of the Server, intended as -// an aid for testing logic surrounding server-to-server or -// server-to-client RPC calls -type RPCServer interface { - RPC(method string, args interface{}, reply interface{}) error - State() *state.StateStore -} diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index a451817fb58..3cbd3f204db 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -737,19 +737,13 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD for _, vol := range volumesToGC { // we have to build this eval by hand rather than calling srv.CoreJob // here because we need to use the volume's namespace - - runningAllocs := ":ok" - if args.Purge { - runningAllocs = ":purge" - } - eval := &structs.Evaluation{ ID: uuid.Generate(), Namespace: job.Namespace, Priority: structs.CoreJobPriority, Type: structs.JobTypeCore, TriggeredBy: structs.EvalTriggerAllocStop, - JobID: structs.CoreJobCSIVolumeClaimGC + ":" + vol.Source + runningAllocs, + JobID: structs.CoreJobCSIVolumeClaimGC + ":" + vol.Source, LeaderACL: j.srv.getLeaderAcl(), Status: structs.EvalStatusPending, CreateTime: now, diff --git a/nomad/node_endpoint.go b/nomad/node_endpoint.go index fcfbcfcc232..7308c00aa63 100644 --- a/nomad/node_endpoint.go +++ b/nomad/node_endpoint.go @@ -1149,7 +1149,7 @@ func (n *Node) UpdateAlloc(args *structs.AllocUpdateRequest, reply *structs.Gene Priority: structs.CoreJobPriority, Type: structs.JobTypeCore, TriggeredBy: structs.EvalTriggerAllocStop, - JobID: structs.CoreJobCSIVolumeClaimGC + ":" + volAndNamespace[0] + ":no", + JobID: structs.CoreJobCSIVolumeClaimGC + ":" + volAndNamespace[0], LeaderACL: n.srv.getLeaderAcl(), Status: structs.EvalStatusPending, CreateTime: now.UTC().UnixNano(), diff --git a/nomad/node_endpoint_test.go b/nomad/node_endpoint_test.go index c1d54ebc849..e687614409a 100644 --- a/nomad/node_endpoint_test.go +++ b/nomad/node_endpoint_test.go @@ -2414,7 +2414,7 @@ func TestClientEndpoint_UpdateAlloc_UnclaimVolumes(t *testing.T) { // Verify the eval for the claim GC was emitted // Lookup the evaluations - eval, err := state.EvalsByJob(ws, job.Namespace, structs.CoreJobCSIVolumeClaimGC+":"+volId0+":no") + eval, err := state.EvalsByJob(ws, job.Namespace, structs.CoreJobCSIVolumeClaimGC+":"+volId0) require.NotNil(t, eval) require.Nil(t, err) }