Skip to content

Commit

Permalink
Periodic GC for volume claims (#7881)
Browse files Browse the repository at this point in the history
This changeset implements a periodic garbage collection of CSI volumes
with missing allocations. This can happen in a scenario where a node
update fails partially and the allocation updates are written to raft
but the evaluations to GC the volumes are dropped. This feature will
cover this edge case and ensure that upgrades from 0.11.0 and 0.11.1
get any stray claims cleaned up.
  • Loading branch information
tgross authored May 11, 2020
1 parent 62316a4 commit 8192aa6
Show file tree
Hide file tree
Showing 10 changed files with 361 additions and 72 deletions.
7 changes: 7 additions & 0 deletions command/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,13 @@ func convertServerConfig(agentConfig *Config) (*nomad.Config, error) {
}
conf.DeploymentGCThreshold = dur
}
if gcThreshold := agentConfig.Server.CSIVolumeClaimGCThreshold; gcThreshold != "" {
dur, err := time.ParseDuration(gcThreshold)
if err != nil {
return nil, err
}
conf.CSIVolumeClaimGCThreshold = dur
}
if gcThreshold := agentConfig.Server.CSIPluginGCThreshold; gcThreshold != "" {
dur, err := time.ParseDuration(gcThreshold)
if err != nil {
Expand Down
8 changes: 8 additions & 0 deletions command/agent/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,6 +389,11 @@ type ServerConfig struct {
// GCed but the threshold can be used to filter by age.
DeploymentGCThreshold string `hcl:"deployment_gc_threshold"`

// CSIVolumeClaimGCThreshold controls how "old" a CSI volume must be to
// have its claims collected by GC. Age is not the only requirement for
// a volume to be GCed but the threshold can be used to filter by age.
CSIVolumeClaimGCThreshold string `hcl:"csi_volume_claim_gc_threshold"`

// CSIPluginGCThreshold controls how "old" a CSI plugin must be to be
// collected by GC. Age is not the only requirement for a plugin to be
// GCed but the threshold can be used to filter by age.
Expand Down Expand Up @@ -1328,6 +1333,9 @@ func (a *ServerConfig) Merge(b *ServerConfig) *ServerConfig {
if b.DeploymentGCThreshold != "" {
result.DeploymentGCThreshold = b.DeploymentGCThreshold
}
if b.CSIVolumeClaimGCThreshold != "" {
result.CSIVolumeClaimGCThreshold = b.CSIVolumeClaimGCThreshold
}
if b.CSIPluginGCThreshold != "" {
result.CSIPluginGCThreshold = b.CSIPluginGCThreshold
}
Expand Down
59 changes: 30 additions & 29 deletions command/agent/config_parse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,35 +91,36 @@ var basicConfig = &Config{
BridgeNetworkSubnet: "custom_bridge_subnet",
},
Server: &ServerConfig{
Enabled: true,
AuthoritativeRegion: "foobar",
BootstrapExpect: 5,
DataDir: "/tmp/data",
ProtocolVersion: 3,
RaftProtocol: 3,
NumSchedulers: helper.IntToPtr(2),
EnabledSchedulers: []string{"test"},
NodeGCThreshold: "12h",
EvalGCThreshold: "12h",
JobGCInterval: "3m",
JobGCThreshold: "12h",
DeploymentGCThreshold: "12h",
CSIPluginGCThreshold: "12h",
HeartbeatGrace: 30 * time.Second,
HeartbeatGraceHCL: "30s",
MinHeartbeatTTL: 33 * time.Second,
MinHeartbeatTTLHCL: "33s",
MaxHeartbeatsPerSecond: 11.0,
RetryJoin: []string{"1.1.1.1", "2.2.2.2"},
StartJoin: []string{"1.1.1.1", "2.2.2.2"},
RetryInterval: 15 * time.Second,
RetryIntervalHCL: "15s",
RejoinAfterLeave: true,
RetryMaxAttempts: 3,
NonVotingServer: true,
RedundancyZone: "foo",
UpgradeVersion: "0.8.0",
EncryptKey: "abc",
Enabled: true,
AuthoritativeRegion: "foobar",
BootstrapExpect: 5,
DataDir: "/tmp/data",
ProtocolVersion: 3,
RaftProtocol: 3,
NumSchedulers: helper.IntToPtr(2),
EnabledSchedulers: []string{"test"},
NodeGCThreshold: "12h",
EvalGCThreshold: "12h",
JobGCInterval: "3m",
JobGCThreshold: "12h",
DeploymentGCThreshold: "12h",
CSIVolumeClaimGCThreshold: "12h",
CSIPluginGCThreshold: "12h",
HeartbeatGrace: 30 * time.Second,
HeartbeatGraceHCL: "30s",
MinHeartbeatTTL: 33 * time.Second,
MinHeartbeatTTLHCL: "33s",
MaxHeartbeatsPerSecond: 11.0,
RetryJoin: []string{"1.1.1.1", "2.2.2.2"},
StartJoin: []string{"1.1.1.1", "2.2.2.2"},
RetryInterval: 15 * time.Second,
RetryIntervalHCL: "15s",
RejoinAfterLeave: true,
RetryMaxAttempts: 3,
NonVotingServer: true,
RedundancyZone: "foo",
UpgradeVersion: "0.8.0",
EncryptKey: "abc",
ServerJoin: &ServerJoin{
RetryJoin: []string{"1.1.1.1", "2.2.2.2"},
RetryInterval: time.Duration(15) * time.Second,
Expand Down
53 changes: 27 additions & 26 deletions command/agent/testdata/basic.hcl
Original file line number Diff line number Diff line change
Expand Up @@ -102,32 +102,33 @@ client {
}

server {
enabled = true
authoritative_region = "foobar"
bootstrap_expect = 5
data_dir = "/tmp/data"
protocol_version = 3
raft_protocol = 3
num_schedulers = 2
enabled_schedulers = ["test"]
node_gc_threshold = "12h"
job_gc_interval = "3m"
job_gc_threshold = "12h"
eval_gc_threshold = "12h"
deployment_gc_threshold = "12h"
csi_plugin_gc_threshold = "12h"
heartbeat_grace = "30s"
min_heartbeat_ttl = "33s"
max_heartbeats_per_second = 11.0
retry_join = ["1.1.1.1", "2.2.2.2"]
start_join = ["1.1.1.1", "2.2.2.2"]
retry_max = 3
retry_interval = "15s"
rejoin_after_leave = true
non_voting_server = true
redundancy_zone = "foo"
upgrade_version = "0.8.0"
encrypt = "abc"
enabled = true
authoritative_region = "foobar"
bootstrap_expect = 5
data_dir = "/tmp/data"
protocol_version = 3
raft_protocol = 3
num_schedulers = 2
enabled_schedulers = ["test"]
node_gc_threshold = "12h"
job_gc_interval = "3m"
job_gc_threshold = "12h"
eval_gc_threshold = "12h"
deployment_gc_threshold = "12h"
csi_volume_claim_gc_threshold = "12h"
csi_plugin_gc_threshold = "12h"
heartbeat_grace = "30s"
min_heartbeat_ttl = "33s"
max_heartbeats_per_second = 11.0
retry_join = ["1.1.1.1", "2.2.2.2"]
start_join = ["1.1.1.1", "2.2.2.2"]
retry_max = 3
retry_interval = "15s"
rejoin_after_leave = true
non_voting_server = true
redundancy_zone = "foo"
upgrade_version = "0.8.0"
encrypt = "abc"

server_join {
retry_join = ["1.1.1.1", "2.2.2.2"]
Expand Down
1 change: 1 addition & 0 deletions command/agent/testdata/basic.json
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,7 @@
"authoritative_region": "foobar",
"bootstrap_expect": 5,
"csi_plugin_gc_threshold": "12h",
"csi_volume_claim_gc_threshold": "12h",
"data_dir": "/tmp/data",
"deployment_gc_threshold": "12h",
"enabled": true,
Expand Down
10 changes: 10 additions & 0 deletions nomad/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,14 @@ type Config struct {
// for GC. This gives users some time to debug plugins.
CSIPluginGCThreshold time.Duration

// CSIVolumeClaimGCInterval is how often we dispatch a job to GC
// volume claims.
CSIVolumeClaimGCInterval time.Duration

// CSIVolumeClaimGCThreshold is how "old" a volume must be to be
// eligible for GC. This gives users some time to debug volumes.
CSIVolumeClaimGCThreshold time.Duration

// EvalNackTimeout controls how long we allow a sub-scheduler to
// work on an evaluation before we consider it failed and Nack it.
// This allows that evaluation to be handed to another sub-scheduler
Expand Down Expand Up @@ -386,6 +394,8 @@ func DefaultConfig() *Config {
DeploymentGCThreshold: 1 * time.Hour,
CSIPluginGCInterval: 5 * time.Minute,
CSIPluginGCThreshold: 1 * time.Hour,
CSIVolumeClaimGCInterval: 5 * time.Minute,
CSIVolumeClaimGCThreshold: 1 * time.Hour,
EvalNackTimeout: 60 * time.Second,
EvalDeliveryLimit: 3,
EvalNackInitialReenqueueDelay: 1 * time.Second,
Expand Down
102 changes: 88 additions & 14 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ func (c *CoreScheduler) forceGC(eval *structs.Evaluation) error {
if err := c.csiPluginGC(eval); err != nil {
return err
}
if err := c.csiVolumeClaimGC(eval); err != nil {
return err
}

// Node GC must occur after the others to ensure the allocations are
// cleared.
Expand Down Expand Up @@ -714,32 +717,103 @@ func allocGCEligible(a *structs.Allocation, job *structs.Job, gcTime time.Time,
return timeDiff > interval.Nanoseconds()
}

// TODO: we need a periodic trigger to iterate over all the volumes and split
// them up into separate work items, same as we do for jobs.

// csiVolumeClaimGC is used to garbage collect CSI volume claims
func (c *CoreScheduler) csiVolumeClaimGC(eval *structs.Evaluation) error {

gcClaims := func(ns, volID string) error {
req := &structs.CSIVolumeClaimRequest{
VolumeID: volID,
Claim: structs.CSIVolumeClaimRelease,
}
req.Namespace = ns
req.Region = c.srv.config.Region
err := c.srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{})
return err
}

c.logger.Trace("garbage collecting unclaimed CSI volume claims", "eval.JobID", eval.JobID)

// Volume ID smuggled in with the eval's own JobID
evalVolID := strings.Split(eval.JobID, ":")

// COMPAT(1.0): 0.11.0 shipped with 3 fields. tighten this check to len == 2
if len(evalVolID) < 2 {
c.logger.Error("volume gc called without volID")
return nil
if len(evalVolID) > 1 {
volID := evalVolID[1]
return gcClaims(eval.Namespace, volID)
}

ws := memdb.NewWatchSet()

iter, err := c.snap.CSIVolumes(ws)
if err != nil {
return err
}

// Get the time table to calculate GC cutoffs.
var oldThreshold uint64
if eval.JobID == structs.CoreJobForceGC {
// The GC was forced, so set the threshold to its maximum so
// everything will GC.
oldThreshold = math.MaxUint64
c.logger.Debug("forced volume claim GC")
} else {
tt := c.srv.fsm.TimeTable()
cutoff := time.Now().UTC().Add(-1 * c.srv.config.CSIVolumeClaimGCThreshold)
oldThreshold = tt.NearestIndex(cutoff)
}

volID := evalVolID[1]
req := &structs.CSIVolumeClaimRequest{
VolumeID: volID,
Claim: structs.CSIVolumeClaimRelease,
c.logger.Debug("CSI volume claim GC scanning before cutoff index",
"index", oldThreshold,
"csi_volume_claim_gc_threshold", c.srv.config.CSIVolumeClaimGCThreshold)

NEXT_VOLUME:
for i := iter.Next(); i != nil; i = iter.Next() {
vol := i.(*structs.CSIVolume)

// Ignore new volumes
if vol.CreateIndex > oldThreshold {
continue
}

// we only call the claim release RPC if the volume has claims
// that no longer have valid allocations. otherwise we'd send
// out a lot of do-nothing RPCs.
for id := range vol.ReadClaims {
alloc, err := c.snap.AllocByID(ws, id)
if err != nil {
return err
}
if alloc == nil {
err = gcClaims(vol.Namespace, vol.ID)
if err != nil {
return err
}
goto NEXT_VOLUME
}
}
for id := range vol.WriteClaims {
alloc, err := c.snap.AllocByID(ws, id)
if err != nil {
return err
}
if alloc == nil {
err = gcClaims(vol.Namespace, vol.ID)
if err != nil {
return err
}
goto NEXT_VOLUME
}
}
if len(vol.PastClaims) > 0 {
err = gcClaims(vol.Namespace, vol.ID)
if err != nil {
return err
}
}

}
req.Namespace = eval.Namespace
req.Region = c.srv.config.Region
return nil

err := c.srv.RPC("CSIVolume.Claim", req, &structs.CSIVolumeClaimResponse{})
return err
}

// csiPluginGC is used to garbage collect unused plugins
Expand Down
Loading

0 comments on commit 8192aa6

Please sign in to comment.