Skip to content

Commit

Permalink
Merge pull request #3982 from hashicorp/b-shard-jobs
Browse files Browse the repository at this point in the history
Batch Job Garbage Collection
  • Loading branch information
dadgar authored Mar 16, 2018
2 parents 0cc4e3c + 8be12ee commit 91a2ed4
Show file tree
Hide file tree
Showing 8 changed files with 705 additions and 327 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ __BACKWARDS INCOMPATIBILITIES:__
[restart stanza](https://www.nomadproject.io/docs/job-specification/restart.html) for more information.

IMPROVEMENTS:
* core: Servers can now service client HTTP endpoints [[GH-3892](https://github.com/hashicorp/nomad/issues/3892)]
* core: More efficient garbage collection of large batches of jobs [[GH-3982](https://github.com/hashicorp/nomad/issues/3982)]
* core: Allow upgrading/downgrading TLS via SIGHUP on both servers and clients [[GH-3492](https://github.com/hashicorp/nomad/issues/3492)]
* core: Node events are emitted for events such as node registration and
heartbeating [[GH-3945](https://github.com/hashicorp/nomad/issues/3945)]
Expand Down
59 changes: 47 additions & 12 deletions nomad/core_sched.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,25 +151,60 @@ OUTER:
return err
}

// Call to the leader to deregister the jobs.
for _, job := range gcJob {
req := structs.JobDeregisterRequest{
JobID: job.ID,
Purge: true,
// Reap the jobs
return c.jobReap(gcJob, eval.LeaderACL)
}

// jobReap contacts the leader and issues a reap on the passed jobs
func (c *CoreScheduler) jobReap(jobs []*structs.Job, leaderACL string) error {
// Call to the leader to issue the reap
for _, req := range c.partitionJobReap(jobs, leaderACL) {
var resp structs.JobBatchDeregisterResponse
if err := c.srv.RPC("Job.BatchDeregister", req, &resp); err != nil {
c.srv.logger.Printf("[ERR] sched.core: batch job reap failed: %v", err)
return err
}
}

return nil
}

// partitionJobReap returns a list of JobBatchDeregisterRequests to make,
// ensuring a single request does not contain too many jobs. This is necessary
// to ensure that the Raft transaction does not become too large.
func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string) []*structs.JobBatchDeregisterRequest {
option := &structs.JobDeregisterOptions{Purge: true}
var requests []*structs.JobBatchDeregisterRequest
submittedJobs := 0
for submittedJobs != len(jobs) {
req := &structs.JobBatchDeregisterRequest{
Jobs: make(map[structs.NamespacedID]*structs.JobDeregisterOptions),
WriteRequest: structs.WriteRequest{
Region: c.srv.config.Region,
Namespace: job.Namespace,
AuthToken: eval.LeaderACL,
AuthToken: leaderACL,
},
}
var resp structs.JobDeregisterResponse
if err := c.srv.RPC("Job.Deregister", &req, &resp); err != nil {
c.srv.logger.Printf("[ERR] sched.core: job deregister failed: %v", err)
return err
requests = append(requests, req)
available := maxIdsPerReap

if remaining := len(jobs) - submittedJobs; remaining > 0 {
if remaining <= available {
for _, job := range jobs[submittedJobs:] {
jns := structs.NamespacedID{ID: job.ID, Namespace: job.Namespace}
req.Jobs[jns] = option
}
submittedJobs += remaining
} else {
for _, job := range jobs[submittedJobs : submittedJobs+available] {
jns := structs.NamespacedID{ID: job.ID, Namespace: job.Namespace}
req.Jobs[jns] = option
}
submittedJobs += available
}
}
}

return nil
return requests
}

// evalGC is used to garbage collect old evaluations
Expand Down
27 changes: 27 additions & 0 deletions nomad/core_sched_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1767,6 +1767,33 @@ func TestCoreScheduler_PartitionDeploymentReap(t *testing.T) {
}
}

func TestCoreScheduler_PartitionJobReap(t *testing.T) {
t.Parallel()
require := require.New(t)
s1 := TestServer(t, nil)
defer s1.Shutdown()
testutil.WaitForLeader(t, s1.RPC)

// Create a core scheduler
snap, err := s1.fsm.State().Snapshot()
if err != nil {
t.Fatalf("err: %v", err)
}
core := NewCoreScheduler(s1, snap)

// Set the max ids per reap to something lower.
maxIdsPerReap = 2

jobs := []*structs.Job{mock.Job(), mock.Job(), mock.Job()}
requests := core.(*CoreScheduler).partitionJobReap(jobs, "")
require.Len(requests, 2)

first := requests[0]
second := requests[1]
require.Len(first.Jobs, 2)
require.Len(second.Jobs, 1)
}

// Tests various scenarios when allocations are eligible to be GCed
func TestAllocation_GCEligible(t *testing.T) {
type testCase struct {
Expand Down
41 changes: 35 additions & 6 deletions nomad/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} {
return n.applyAutopilotUpdate(buf[1:], log.Index)
case structs.UpsertNodeEventsType:
return n.applyUpsertNodeEvent(buf[1:], log.Index)
case structs.JobBatchDeregisterRequestType:
return n.applyBatchDeregisterJob(buf[1:], log.Index)
}

// Check enterprise only message types.
Expand Down Expand Up @@ -431,33 +433,60 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} {
panic(fmt.Errorf("failed to decode request: %v", err))
}

if err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge); err != nil {
n.logger.Printf("[ERR] nomad.fsm: deregistering job failed: %v", err)
return err
}

return nil
}

func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{} {
defer metrics.MeasureSince([]string{"nomad", "fsm", "batch_deregister_job"}, time.Now())
var req structs.JobBatchDeregisterRequest
if err := structs.Decode(buf, &req); err != nil {
panic(fmt.Errorf("failed to decode request: %v", err))
}

for jobNS, options := range req.Jobs {
if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge); err != nil {
n.logger.Printf("[ERR] nomad.fsm: deregistering %v failed: %v", jobNS, err)
return err
}
}

return n.upsertEvals(index, req.Evals)
}

// handleJobDeregister is used to deregister a job.
func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool) error {
// If it is periodic remove it from the dispatcher
if err := n.periodicDispatcher.Remove(req.Namespace, req.JobID); err != nil {
if err := n.periodicDispatcher.Remove(namespace, jobID); err != nil {
n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Remove failed: %v", err)
return err
}

if req.Purge {
if err := n.state.DeleteJob(index, req.Namespace, req.JobID); err != nil {
if purge {
if err := n.state.DeleteJob(index, namespace, jobID); err != nil {
n.logger.Printf("[ERR] nomad.fsm: DeleteJob failed: %v", err)
return err
}

// We always delete from the periodic launch table because it is possible that
// the job was updated to be non-periodic, thus checking if it is periodic
// doesn't ensure we clean it up properly.
n.state.DeletePeriodicLaunch(index, req.Namespace, req.JobID)
n.state.DeletePeriodicLaunch(index, namespace, jobID)
} else {
// Get the current job and mark it as stopped and re-insert it.
ws := memdb.NewWatchSet()
current, err := n.state.JobByID(ws, req.Namespace, req.JobID)
current, err := n.state.JobByID(ws, namespace, jobID)
if err != nil {
n.logger.Printf("[ERR] nomad.fsm: JobByID lookup failed: %v", err)
return err
}

if current == nil {
return fmt.Errorf("job %q in namespace %q doesn't exist to be deregistered", req.JobID, req.Namespace)
return fmt.Errorf("job %q in namespace %q doesn't exist to be deregistered", jobID, namespace)
}

stopped := current.Copy()
Expand Down
78 changes: 78 additions & 0 deletions nomad/fsm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,84 @@ func TestFSM_DeregisterJob_NoPurge(t *testing.T) {
}
}

func TestFSM_BatchDeregisterJob(t *testing.T) {
t.Parallel()
require := require.New(t)
fsm := testFSM(t)

job := mock.PeriodicJob()
req := structs.JobRegisterRequest{
Job: job,
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
buf, err := structs.Encode(structs.JobRegisterRequestType, req)
require.Nil(err)
resp := fsm.Apply(makeLog(buf))
require.Nil(resp)

job2 := mock.Job()
req2 := structs.JobRegisterRequest{
Job: job2,
WriteRequest: structs.WriteRequest{
Namespace: job2.Namespace,
},
}

buf, err = structs.Encode(structs.JobRegisterRequestType, req2)
require.Nil(err)
resp = fsm.Apply(makeLog(buf))
require.Nil(resp)

req3 := structs.JobBatchDeregisterRequest{
Jobs: map[structs.NamespacedID]*structs.JobDeregisterOptions{
structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}: &structs.JobDeregisterOptions{},
structs.NamespacedID{
ID: job2.ID,
Namespace: job2.Namespace,
}: &structs.JobDeregisterOptions{
Purge: true,
},
},
WriteRequest: structs.WriteRequest{
Namespace: job.Namespace,
},
}
buf, err = structs.Encode(structs.JobBatchDeregisterRequestType, req3)
require.Nil(err)

resp = fsm.Apply(makeLog(buf))
require.Nil(resp)

// Verify we are NOT registered
ws := memdb.NewWatchSet()
jobOut, err := fsm.State().JobByID(ws, req.Namespace, req.Job.ID)
require.Nil(err)
require.NotNil(jobOut)
require.True(jobOut.Stop)

// Verify it was removed from the periodic runner.
tuple := structs.NamespacedID{
ID: job.ID,
Namespace: job.Namespace,
}
require.NotContains(fsm.periodicDispatcher.tracked, tuple)

// Verify it was not removed from the periodic launch table.
launchOut, err := fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID)
require.Nil(err)
require.NotNil(launchOut)

// Verify the other jbo was purged
jobOut2, err := fsm.State().JobByID(ws, job2.Namespace, job2.ID)
require.Nil(err)
require.Nil(jobOut2)
}

func TestFSM_UpdateEval(t *testing.T) {
t.Parallel()
fsm := testFSM(t)
Expand Down
85 changes: 83 additions & 2 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -615,8 +615,7 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD

// Create a new evaluation
// XXX: The job priority / type is strange for this, since it's not a high
// priority even if the job was. The scheduler itself also doesn't matter,
// since all should be able to handle deregistration in the same way.
// priority even if the job was.
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: args.RequestNamespace(),
Expand Down Expand Up @@ -646,6 +645,88 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD
return nil
}

// BatchDeregister is used to remove a set of jobs from the cluster.
func (j *Job) BatchDeregister(args *structs.JobBatchDeregisterRequest, reply *structs.JobBatchDeregisterResponse) error {
if done, err := j.srv.forward("Job.BatchDeregister", args, args, reply); done {
return err
}
defer metrics.MeasureSince([]string{"nomad", "job", "batch_deregister"}, time.Now())

// Resolve the ACL token
aclObj, err := j.srv.ResolveToken(args.AuthToken)
if err != nil {
return err
}

// Validate the arguments
if len(args.Jobs) == 0 {
return fmt.Errorf("given no jobs to deregister")
}
if len(args.Evals) != 0 {
return fmt.Errorf("evaluations should not be populated")
}

// Loop through checking for permissions
for jobNS := range args.Jobs {
// Check for submit-job permissions
if aclObj != nil && !aclObj.AllowNsOp(jobNS.Namespace, acl.NamespaceCapabilitySubmitJob) {
return structs.ErrPermissionDenied
}
}

// Grab a snapshot
snap, err := j.srv.fsm.State().Snapshot()
if err != nil {
return err
}

// Loop through to create evals
for jobNS, options := range args.Jobs {
if options == nil {
return fmt.Errorf("no deregister options provided for %v", jobNS)
}

job, err := snap.JobByID(nil, jobNS.Namespace, jobNS.ID)
if err != nil {
return err
}

// If the job is periodic or parameterized, we don't create an eval.
if job != nil && (job.IsPeriodic() || job.IsParameterized()) {
continue
}

priority := structs.JobDefaultPriority
jtype := structs.JobTypeService
if job != nil {
priority = job.Priority
jtype = job.Type
}

// Create a new evaluation
eval := &structs.Evaluation{
ID: uuid.Generate(),
Namespace: jobNS.Namespace,
Priority: priority,
Type: jtype,
TriggeredBy: structs.EvalTriggerJobDeregister,
JobID: jobNS.ID,
Status: structs.EvalStatusPending,
}
args.Evals = append(args.Evals, eval)
}

// Commit this update via Raft
_, index, err := j.srv.raftApply(structs.JobBatchDeregisterRequestType, args)
if err != nil {
j.srv.logger.Printf("[ERR] nomad.job: batch deregister failed: %v", err)
return err
}

reply.Index = index
return nil
}

// GetJob is used to request information about a specific job
func (j *Job) GetJob(args *structs.JobSpecificRequest,
reply *structs.SingleJobResponse) error {
Expand Down
Loading

0 comments on commit 91a2ed4

Please sign in to comment.