Skip to content

Commit

Permalink
csi: remove old volume claim GC mechanism
Browse files Browse the repository at this point in the history
The volume claim GC mechanism now makes an empty claim RPC for the
volume to trigger an index bump. That in turn unblocks the blocking
query in the volume watcher so it can assess which claims can be
released for a volume.
  • Loading branch information
tgross committed Apr 28, 2020
1 parent b158a1f commit 588acb5
Show file tree
Hide file tree
Showing 6 changed files with 17 additions and 485 deletions.
212 changes: 14 additions & 198 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (

log "github.com/hashicorp/go-hclog"
memdb "github.com/hashicorp/go-memdb"
multierror "github.com/hashicorp/go-multierror"
version "github.com/hashicorp/go-version"
cstructs "github.com/hashicorp/nomad/client/structs"
"github.com/hashicorp/nomad/nomad/state"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/scheduler"
Expand Down Expand Up @@ -711,212 +709,30 @@ func allocGCEligible(a *structs.Allocation, job *structs.Job, gcTime time.Time,
return timeDiff > interval.Nanoseconds()
}

// TODO: we need a periodic trigger to iterate over all the volumes and split
// them up into separate work items, same as we do for jobs.

// csiVolumeClaimGC is used to garbage collect CSI volume claims
func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error {
c.logger.Trace("garbage collecting unclaimed CSI volume claims")
c.logger.Trace("garbage collecting unclaimed CSI volume claims", "eval.JobID", eval.JobID)

// Volume ID smuggled in with the eval's own JobID
evalVolID := strings.Split(eval.JobID, ":")
if len(evalVolID) != 3 {

// COMPAT(1.0): 0.11.0 shipped with 3 fields. tighten this check to len == 2
if len(evalVolID) < 2 {
c.logger.Error("volume gc called without volID")
return nil
}

volID := evalVolID[1]
runningAllocs := evalVolID[2] == "purge"
return volumeClaimReap(c.srv, volID, eval.Namespace,
c.srv.config.Region, eval.LeaderACL, runningAllocs)
}

func volumeClaimReap(srv RPCServer, volID, namespace, region, leaderACL string, runningAllocs bool) error {

ws := memdb.NewWatchSet()

vol, err := srv.State().CSIVolumeByID(ws, namespace, volID)
if err != nil {
return err
}
if vol == nil {
return nil
}
vol, err = srv.State().CSIVolumeDenormalize(ws, vol)
if err != nil {
return err
}

plug, err := srv.State().CSIPluginByID(ws, vol.PluginID)
if err != nil {
return err
}

nodeClaims := collectClaimsToGCImpl(vol, runningAllocs)

var result *multierror.Error
for _, claim := range vol.PastClaims {
nodeClaims, err = volumeClaimReapImpl(srv,
&volumeClaimReapArgs{
vol: vol,
plug: plug,
claim: claim,
namespace: namespace,
region: region,
leaderACL: leaderACL,
nodeClaims: nodeClaims,
},
)
if err != nil {
result = multierror.Append(result, err)
continue
}
req := &structs.CSIVolumeClaimRequest{
VolumeID: volID,
Claim: structs.CSIVolumeClaimRelease,
}
return result.ErrorOrNil()

}
req.Namespace = eval.Namespace
req.Region = c.srv.config.Region

func collectClaimsToGCImpl(vol *structs.CSIVolume, runningAllocs bool) map[string]int {
nodeClaims := map[string]int{} // node IDs -> count

collectFunc := func(allocs map[string]*structs.Allocation,
claims map[string]*structs.CSIVolumeClaim) {

for allocID, alloc := range allocs {
claim, ok := claims[allocID]
if !ok {
// COMPAT(1.0): the CSIVolumeClaim fields were added
// after 0.11.1, so claims made before that may be
// missing this value. note that we'll have non-nil
// allocs here because we called denormalize on the
// value.
claim = &structs.CSIVolumeClaim{
AllocationID: allocID,
NodeID: alloc.NodeID,
State: structs.CSIVolumeClaimStateTaken,
}
}
nodeClaims[claim.NodeID]++
if runningAllocs || alloc.Terminated() {
// only overwrite the PastClaim if this is new,
// so that we can track state between subsequent calls
if _, exists := vol.PastClaims[claim.AllocationID]; !exists {
claim.State = structs.CSIVolumeClaimStateTaken
vol.PastClaims[claim.AllocationID] = claim
}
}
}
}

collectFunc(vol.WriteAllocs, vol.WriteClaims)
collectFunc(vol.ReadAllocs, vol.ReadClaims)
return nodeClaims
}

type volumeClaimReapArgs struct {
vol *structs.CSIVolume
plug *structs.CSIPlugin
claim *structs.CSIVolumeClaim
region string
namespace string
leaderACL string
nodeClaims map[string]int // node IDs -> count
}

func volumeClaimReapImpl(srv RPCServer, args *volumeClaimReapArgs) (map[string]int, error) {
vol := args.vol
claim := args.claim

var err error
var nReq *cstructs.ClientCSINodeDetachVolumeRequest

checkpoint := func(claimState structs.CSIVolumeClaimState) error {
req := &structs.CSIVolumeClaimRequest{
VolumeID: vol.ID,
AllocationID: claim.AllocationID,
Claim: structs.CSIVolumeClaimRelease,
WriteRequest: structs.WriteRequest{
Region: args.region,
Namespace: args.namespace,
AuthToken: args.leaderACL,
},
}
return srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{})
}

// previous checkpoints may have set the past claim state already.
// in practice we should never see CSIVolumeClaimStateControllerDetached
// but having an option for the state makes it easy to add a checkpoint
// in a backwards compatible way if we need one later
switch claim.State {
case structs.CSIVolumeClaimStateNodeDetached:
goto NODE_DETACHED
case structs.CSIVolumeClaimStateControllerDetached:
goto RELEASE_CLAIM
case structs.CSIVolumeClaimStateReadyToFree:
goto RELEASE_CLAIM
}

// (1) NodePublish / NodeUnstage must be completed before controller
// operations or releasing the claim.
nReq = &cstructs.ClientCSINodeDetachVolumeRequest{
PluginID: args.plug.ID,
VolumeID: vol.ID,
ExternalID: vol.RemoteID(),
AllocID: claim.AllocationID,
NodeID: claim.NodeID,
AttachmentMode: vol.AttachmentMode,
AccessMode: vol.AccessMode,
ReadOnly: claim.Mode == structs.CSIVolumeClaimRead,
}
err = srv.RPC("ClientCSI.NodeDetachVolume", nReq,
&cstructs.ClientCSINodeDetachVolumeResponse{})
if err != nil {
return args.nodeClaims, err
}
err = checkpoint(structs.CSIVolumeClaimStateNodeDetached)
if err != nil {
return args.nodeClaims, err
}

NODE_DETACHED:
args.nodeClaims[claim.NodeID]--

// (2) we only emit the controller unpublish if no other allocs
// on the node need it, but we also only want to make this
// call at most once per node
if vol.ControllerRequired && args.nodeClaims[claim.NodeID] < 1 {

// we need to get the CSI Node ID, which is not the same as
// the Nomad Node ID
ws := memdb.NewWatchSet()
targetNode, err := srv.State().NodeByID(ws, claim.NodeID)
if err != nil {
return args.nodeClaims, err
}
if targetNode == nil {
return args.nodeClaims, fmt.Errorf("%s: %s",
structs.ErrUnknownNodePrefix, claim.NodeID)
}
targetCSIInfo, ok := targetNode.CSINodePlugins[args.plug.ID]
if !ok {
return args.nodeClaims, fmt.Errorf("Failed to find NodeInfo for node: %s", targetNode.ID)
}

cReq := &cstructs.ClientCSIControllerDetachVolumeRequest{
VolumeID: vol.RemoteID(),
ClientCSINodeID: targetCSIInfo.NodeInfo.ID,
}
cReq.PluginID = args.plug.ID
err = srv.RPC("ClientCSI.ControllerDetachVolume", cReq,
&cstructs.ClientCSIControllerDetachVolumeResponse{})
if err != nil {
return args.nodeClaims, err
}
}

RELEASE_CLAIM:
// (3) release the claim from the state store, allowing it to be rescheduled
err = checkpoint(structs.CSIVolumeClaimStateReadyToFree)
if err != nil {
return args.nodeClaims, err
}
return args.nodeClaims, nil
err := c.srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{})
return err
}
Loading

0 comments on commit 588acb5

Please sign in to comment.