diff --git a/CHANGELOG.md b/CHANGELOG.md index f1227aec920..6ed244c3d6f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,8 @@ __BACKWARDS INCOMPATIBILITIES:__ [restart stanza](https://www.nomadproject.io/docs/job-specification/restart.html) for more information. IMPROVEMENTS: + * core: Servers can now service client HTTP endpoints [[GH-3892](https://github.com/hashicorp/nomad/issues/3892)] + * core: More efficient garbage collection of large batches of jobs [[GH-3982](https://github.com/hashicorp/nomad/issues/3982)] * core: Allow upgrading/downgrading TLS via SIGHUP on both servers and clients [[GH-3492](https://github.com/hashicorp/nomad/issues/3492)] * core: Node events are emitted for events such as node registration and heartbeating [[GH-3945](https://github.com/hashicorp/nomad/issues/3945)] diff --git a/nomad/core_sched.go b/nomad/core_sched.go index 0786af49727..a3277f8589f 100644 --- a/nomad/core_sched.go +++ b/nomad/core_sched.go @@ -151,25 +151,60 @@ OUTER: return err } - // Call to the leader to deregister the jobs. - for _, job := range gcJob { - req := structs.JobDeregisterRequest{ - JobID: job.ID, - Purge: true, + // Reap the jobs + return c.jobReap(gcJob, eval.LeaderACL) +} + +// jobReap contacts the leader and issues a reap on the passed jobs +func (c *CoreScheduler) jobReap(jobs []*structs.Job, leaderACL string) error { + // Call to the leader to issue the reap + for _, req := range c.partitionJobReap(jobs, leaderACL) { + var resp structs.JobBatchDeregisterResponse + if err := c.srv.RPC("Job.BatchDeregister", req, &resp); err != nil { + c.srv.logger.Printf("[ERR] sched.core: batch job reap failed: %v", err) + return err + } + } + + return nil +} + +// partitionJobReap returns a list of JobBatchDeregisterRequests to make, +// ensuring a single request does not contain too many jobs. This is necessary +// to ensure that the Raft transaction does not become too large. +func (c *CoreScheduler) partitionJobReap(jobs []*structs.Job, leaderACL string) []*structs.JobBatchDeregisterRequest { + option := &structs.JobDeregisterOptions{Purge: true} + var requests []*structs.JobBatchDeregisterRequest + submittedJobs := 0 + for submittedJobs != len(jobs) { + req := &structs.JobBatchDeregisterRequest{ + Jobs: make(map[structs.NamespacedID]*structs.JobDeregisterOptions), WriteRequest: structs.WriteRequest{ Region: c.srv.config.Region, - Namespace: job.Namespace, - AuthToken: eval.LeaderACL, + AuthToken: leaderACL, }, } - var resp structs.JobDeregisterResponse - if err := c.srv.RPC("Job.Deregister", &req, &resp); err != nil { - c.srv.logger.Printf("[ERR] sched.core: job deregister failed: %v", err) - return err + requests = append(requests, req) + available := maxIdsPerReap + + if remaining := len(jobs) - submittedJobs; remaining > 0 { + if remaining <= available { + for _, job := range jobs[submittedJobs:] { + jns := structs.NamespacedID{ID: job.ID, Namespace: job.Namespace} + req.Jobs[jns] = option + } + submittedJobs += remaining + } else { + for _, job := range jobs[submittedJobs : submittedJobs+available] { + jns := structs.NamespacedID{ID: job.ID, Namespace: job.Namespace} + req.Jobs[jns] = option + } + submittedJobs += available + } } } - return nil + return requests } // evalGC is used to garbage collect old evaluations diff --git a/nomad/core_sched_test.go b/nomad/core_sched_test.go index a1ab8075451..68ac7dbc853 100644 --- a/nomad/core_sched_test.go +++ b/nomad/core_sched_test.go @@ -1767,6 +1767,33 @@ func TestCoreScheduler_PartitionDeploymentReap(t *testing.T) { } } +func TestCoreScheduler_PartitionJobReap(t *testing.T) { + t.Parallel() + require := require.New(t) + s1 := TestServer(t, nil) + defer s1.Shutdown() + testutil.WaitForLeader(t, s1.RPC) + + // Create a core scheduler + snap, err := s1.fsm.State().Snapshot() + if err != nil { + t.Fatalf("err: %v", err) + } + core := NewCoreScheduler(s1, snap) + + // Set the max ids per reap to something lower. + maxIdsPerReap = 2 + + jobs := []*structs.Job{mock.Job(), mock.Job(), mock.Job()} + requests := core.(*CoreScheduler).partitionJobReap(jobs, "") + require.Len(requests, 2) + + first := requests[0] + second := requests[1] + require.Len(first.Jobs, 2) + require.Len(second.Jobs, 1) +} + // Tests various scenarios when allocations are eligible to be GCed func TestAllocation_GCEligible(t *testing.T) { type testCase struct { diff --git a/nomad/fsm.go b/nomad/fsm.go index 65344e72473..21a785b6750 100644 --- a/nomad/fsm.go +++ b/nomad/fsm.go @@ -238,6 +238,8 @@ func (n *nomadFSM) Apply(log *raft.Log) interface{} { return n.applyAutopilotUpdate(buf[1:], log.Index) case structs.UpsertNodeEventsType: return n.applyUpsertNodeEvent(buf[1:], log.Index) + case structs.JobBatchDeregisterRequestType: + return n.applyBatchDeregisterJob(buf[1:], log.Index) } // Check enterprise only message types. @@ -431,14 +433,41 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} { panic(fmt.Errorf("failed to decode request: %v", err)) } + if err := n.handleJobDeregister(index, req.JobID, req.Namespace, req.Purge); err != nil { + n.logger.Printf("[ERR] nomad.fsm: deregistering job failed: %v", err) + return err + } + + return nil +} + +func (n *nomadFSM) applyBatchDeregisterJob(buf []byte, index uint64) interface{} { + defer metrics.MeasureSince([]string{"nomad", "fsm", "batch_deregister_job"}, time.Now()) + var req structs.JobBatchDeregisterRequest + if err := structs.Decode(buf, &req); err != nil { + panic(fmt.Errorf("failed to decode request: %v", err)) + } + + for jobNS, options := range req.Jobs { + if err := n.handleJobDeregister(index, jobNS.ID, jobNS.Namespace, options.Purge); err != nil { + n.logger.Printf("[ERR] nomad.fsm: deregistering %v failed: %v", jobNS, err) + return err + } + } + + return n.upsertEvals(index, req.Evals) +} + +// handleJobDeregister is used to deregister a job. +func (n *nomadFSM) handleJobDeregister(index uint64, jobID, namespace string, purge bool) error { // If it is periodic remove it from the dispatcher - if err := n.periodicDispatcher.Remove(req.Namespace, req.JobID); err != nil { + if err := n.periodicDispatcher.Remove(namespace, jobID); err != nil { n.logger.Printf("[ERR] nomad.fsm: periodicDispatcher.Remove failed: %v", err) return err } - if req.Purge { - if err := n.state.DeleteJob(index, req.Namespace, req.JobID); err != nil { + if purge { + if err := n.state.DeleteJob(index, namespace, jobID); err != nil { n.logger.Printf("[ERR] nomad.fsm: DeleteJob failed: %v", err) return err } @@ -446,18 +475,18 @@ func (n *nomadFSM) applyDeregisterJob(buf []byte, index uint64) interface{} { // We always delete from the periodic launch table because it is possible that // the job was updated to be non-periodic, thus checking if it is periodic // doesn't ensure we clean it up properly. - n.state.DeletePeriodicLaunch(index, req.Namespace, req.JobID) + n.state.DeletePeriodicLaunch(index, namespace, jobID) } else { // Get the current job and mark it as stopped and re-insert it. ws := memdb.NewWatchSet() - current, err := n.state.JobByID(ws, req.Namespace, req.JobID) + current, err := n.state.JobByID(ws, namespace, jobID) if err != nil { n.logger.Printf("[ERR] nomad.fsm: JobByID lookup failed: %v", err) return err } if current == nil { - return fmt.Errorf("job %q in namespace %q doesn't exist to be deregistered", req.JobID, req.Namespace) + return fmt.Errorf("job %q in namespace %q doesn't exist to be deregistered", jobID, namespace) } stopped := current.Copy() diff --git a/nomad/fsm_test.go b/nomad/fsm_test.go index 11e49c9d8b7..d91e821d7ab 100644 --- a/nomad/fsm_test.go +++ b/nomad/fsm_test.go @@ -612,6 +612,84 @@ func TestFSM_DeregisterJob_NoPurge(t *testing.T) { } } +func TestFSM_BatchDeregisterJob(t *testing.T) { + t.Parallel() + require := require.New(t) + fsm := testFSM(t) + + job := mock.PeriodicJob() + req := structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Namespace: job.Namespace, + }, + } + buf, err := structs.Encode(structs.JobRegisterRequestType, req) + require.Nil(err) + resp := fsm.Apply(makeLog(buf)) + require.Nil(resp) + + job2 := mock.Job() + req2 := structs.JobRegisterRequest{ + Job: job2, + WriteRequest: structs.WriteRequest{ + Namespace: job2.Namespace, + }, + } + + buf, err = structs.Encode(structs.JobRegisterRequestType, req2) + require.Nil(err) + resp = fsm.Apply(makeLog(buf)) + require.Nil(resp) + + req3 := structs.JobBatchDeregisterRequest{ + Jobs: map[structs.NamespacedID]*structs.JobDeregisterOptions{ + structs.NamespacedID{ + ID: job.ID, + Namespace: job.Namespace, + }: &structs.JobDeregisterOptions{}, + structs.NamespacedID{ + ID: job2.ID, + Namespace: job2.Namespace, + }: &structs.JobDeregisterOptions{ + Purge: true, + }, + }, + WriteRequest: structs.WriteRequest{ + Namespace: job.Namespace, + }, + } + buf, err = structs.Encode(structs.JobBatchDeregisterRequestType, req3) + require.Nil(err) + + resp = fsm.Apply(makeLog(buf)) + require.Nil(resp) + + // Verify we are NOT registered + ws := memdb.NewWatchSet() + jobOut, err := fsm.State().JobByID(ws, req.Namespace, req.Job.ID) + require.Nil(err) + require.NotNil(jobOut) + require.True(jobOut.Stop) + + // Verify it was removed from the periodic runner. + tuple := structs.NamespacedID{ + ID: job.ID, + Namespace: job.Namespace, + } + require.NotContains(fsm.periodicDispatcher.tracked, tuple) + + // Verify it was not removed from the periodic launch table. + launchOut, err := fsm.State().PeriodicLaunchByID(ws, job.Namespace, job.ID) + require.Nil(err) + require.NotNil(launchOut) + + // Verify the other jbo was purged + jobOut2, err := fsm.State().JobByID(ws, job2.Namespace, job2.ID) + require.Nil(err) + require.Nil(jobOut2) +} + func TestFSM_UpdateEval(t *testing.T) { t.Parallel() fsm := testFSM(t) diff --git a/nomad/job_endpoint.go b/nomad/job_endpoint.go index 9b60b48f663..04360d49b81 100644 --- a/nomad/job_endpoint.go +++ b/nomad/job_endpoint.go @@ -615,8 +615,7 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD // Create a new evaluation // XXX: The job priority / type is strange for this, since it's not a high - // priority even if the job was. The scheduler itself also doesn't matter, - // since all should be able to handle deregistration in the same way. + // priority even if the job was. eval := &structs.Evaluation{ ID: uuid.Generate(), Namespace: args.RequestNamespace(), @@ -646,6 +645,88 @@ func (j *Job) Deregister(args *structs.JobDeregisterRequest, reply *structs.JobD return nil } +// BatchDeregister is used to remove a set of jobs from the cluster. +func (j *Job) BatchDeregister(args *structs.JobBatchDeregisterRequest, reply *structs.JobBatchDeregisterResponse) error { + if done, err := j.srv.forward("Job.BatchDeregister", args, args, reply); done { + return err + } + defer metrics.MeasureSince([]string{"nomad", "job", "batch_deregister"}, time.Now()) + + // Resolve the ACL token + aclObj, err := j.srv.ResolveToken(args.AuthToken) + if err != nil { + return err + } + + // Validate the arguments + if len(args.Jobs) == 0 { + return fmt.Errorf("given no jobs to deregister") + } + if len(args.Evals) != 0 { + return fmt.Errorf("evaluations should not be populated") + } + + // Loop through checking for permissions + for jobNS := range args.Jobs { + // Check for submit-job permissions + if aclObj != nil && !aclObj.AllowNsOp(jobNS.Namespace, acl.NamespaceCapabilitySubmitJob) { + return structs.ErrPermissionDenied + } + } + + // Grab a snapshot + snap, err := j.srv.fsm.State().Snapshot() + if err != nil { + return err + } + + // Loop through to create evals + for jobNS, options := range args.Jobs { + if options == nil { + return fmt.Errorf("no deregister options provided for %v", jobNS) + } + + job, err := snap.JobByID(nil, jobNS.Namespace, jobNS.ID) + if err != nil { + return err + } + + // If the job is periodic or parameterized, we don't create an eval. + if job != nil && (job.IsPeriodic() || job.IsParameterized()) { + continue + } + + priority := structs.JobDefaultPriority + jtype := structs.JobTypeService + if job != nil { + priority = job.Priority + jtype = job.Type + } + + // Create a new evaluation + eval := &structs.Evaluation{ + ID: uuid.Generate(), + Namespace: jobNS.Namespace, + Priority: priority, + Type: jtype, + TriggeredBy: structs.EvalTriggerJobDeregister, + JobID: jobNS.ID, + Status: structs.EvalStatusPending, + } + args.Evals = append(args.Evals, eval) + } + + // Commit this update via Raft + _, index, err := j.srv.raftApply(structs.JobBatchDeregisterRequestType, args) + if err != nil { + j.srv.logger.Printf("[ERR] nomad.job: batch deregister failed: %v", err) + return err + } + + reply.Index = index + return nil +} + // GetJob is used to request information about a specific job func (j *Job) GetJob(args *structs.JobSpecificRequest, reply *structs.SingleJobResponse) error { diff --git a/nomad/job_endpoint_test.go b/nomad/job_endpoint_test.go index 3b84ab2ae80..7de9d3ef7a9 100644 --- a/nomad/job_endpoint_test.go +++ b/nomad/job_endpoint_test.go @@ -16,7 +16,7 @@ import ( "github.com/hashicorp/nomad/nomad/structs" "github.com/hashicorp/nomad/testutil" "github.com/kr/pretty" - "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) func TestJobEndpoint_Register(t *testing.T) { @@ -1026,7 +1026,7 @@ func TestJobEndpoint_Revert(t *testing.T) { func TestJobEndpoint_Revert_ACL(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) s1, root := TestACLServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue @@ -1040,12 +1040,12 @@ func TestJobEndpoint_Revert_ACL(t *testing.T) { // Create the job job := mock.Job() err := state.UpsertJob(300, job) - assert.Nil(err) + require.Nil(err) job2 := job.Copy() job2.Priority = 1 err = state.UpsertJob(400, job2) - assert.Nil(err) + require.Nil(err) // Create revert request and enforcing it be at the current version revertReq := &structs.JobRevertRequest{ @@ -1060,8 +1060,8 @@ func TestJobEndpoint_Revert_ACL(t *testing.T) { // Attempt to fetch the response without a valid token var resp structs.JobRegisterResponse err = msgpackrpc.CallWithCodec(codec, "Job.Revert", revertReq, &resp) - assert.NotNil(err) - assert.Contains(err.Error(), "Permission denied") + require.NotNil(err) + require.Contains(err.Error(), "Permission denied") // Attempt to fetch the response with an invalid token invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", @@ -1070,14 +1070,14 @@ func TestJobEndpoint_Revert_ACL(t *testing.T) { revertReq.AuthToken = invalidToken.SecretID var invalidResp structs.JobRegisterResponse err = msgpackrpc.CallWithCodec(codec, "Job.Revert", revertReq, &invalidResp) - assert.NotNil(err) - assert.Contains(err.Error(), "Permission denied") + require.NotNil(err) + require.Contains(err.Error(), "Permission denied") // Fetch the response with a valid management token revertReq.AuthToken = root.SecretID var validResp structs.JobRegisterResponse err = msgpackrpc.CallWithCodec(codec, "Job.Revert", revertReq, &validResp) - assert.Nil(err) + require.Nil(err) // Try with a valid non-management token validToken := mock.CreatePolicyAndToken(t, state, 1003, "test-valid", @@ -1086,7 +1086,7 @@ func TestJobEndpoint_Revert_ACL(t *testing.T) { revertReq.AuthToken = validToken.SecretID var validResp2 structs.JobRegisterResponse err = msgpackrpc.CallWithCodec(codec, "Job.Revert", revertReq, &validResp2) - assert.Nil(err) + require.Nil(err) } func TestJobEndpoint_Stable(t *testing.T) { @@ -1154,7 +1154,7 @@ func TestJobEndpoint_Stable(t *testing.T) { func TestJobEndpoint_Stable_ACL(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) s1, root := TestACLServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue @@ -1167,7 +1167,7 @@ func TestJobEndpoint_Stable_ACL(t *testing.T) { // Register the job job := mock.Job() err := state.UpsertJob(1000, job) - assert.Nil(err) + require.Nil(err) // Create stability request stableReq := &structs.JobStabilityRequest{ @@ -1183,8 +1183,8 @@ func TestJobEndpoint_Stable_ACL(t *testing.T) { // Attempt to fetch the token without a token var stableResp structs.JobStabilityResponse err = msgpackrpc.CallWithCodec(codec, "Job.Stable", stableReq, &stableResp) - assert.NotNil(err) - assert.Contains("Permission denied", err.Error()) + require.NotNil(err) + require.Contains("Permission denied", err.Error()) // Expect failure for request with an invalid token invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", @@ -1193,14 +1193,14 @@ func TestJobEndpoint_Stable_ACL(t *testing.T) { stableReq.AuthToken = invalidToken.SecretID var invalidStableResp structs.JobStabilityResponse err = msgpackrpc.CallWithCodec(codec, "Job.Stable", stableReq, &invalidStableResp) - assert.NotNil(err) - assert.Contains("Permission denied", err.Error()) + require.NotNil(err) + require.Contains("Permission denied", err.Error()) // Attempt to fetch with a management token stableReq.AuthToken = root.SecretID var validStableResp structs.JobStabilityResponse err = msgpackrpc.CallWithCodec(codec, "Job.Stable", stableReq, &validStableResp) - assert.Nil(err) + require.Nil(err) // Attempt to fetch with a valid token validToken := mock.CreatePolicyAndToken(t, state, 1005, "test-invalid", @@ -1209,14 +1209,14 @@ func TestJobEndpoint_Stable_ACL(t *testing.T) { stableReq.AuthToken = validToken.SecretID var validStableResp2 structs.JobStabilityResponse err = msgpackrpc.CallWithCodec(codec, "Job.Stable", stableReq, &validStableResp2) - assert.Nil(err) + require.Nil(err) // Check that the job is marked stable ws := memdb.NewWatchSet() out, err := state.JobByID(ws, job.Namespace, job.ID) - assert.Nil(err) - assert.NotNil(job) - assert.Equal(true, out.Stable) + require.Nil(err) + require.NotNil(job) + require.Equal(true, out.Stable) } func TestJobEndpoint_Evaluate(t *testing.T) { @@ -1300,7 +1300,7 @@ func TestJobEndpoint_Evaluate(t *testing.T) { func TestJobEndpoint_Evaluate_ACL(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) s1, root := TestACLServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue @@ -1313,7 +1313,7 @@ func TestJobEndpoint_Evaluate_ACL(t *testing.T) { // Create the job job := mock.Job() err := state.UpsertJob(300, job) - assert.Nil(err) + require.Nil(err) // Force a re-evaluation reEval := &structs.JobEvaluateRequest{ @@ -1327,8 +1327,8 @@ func TestJobEndpoint_Evaluate_ACL(t *testing.T) { // Attempt to fetch the response without a token var resp structs.JobRegisterResponse err = msgpackrpc.CallWithCodec(codec, "Job.Evaluate", reEval, &resp) - assert.NotNil(err) - assert.Contains(err.Error(), "Permission denied") + require.NotNil(err) + require.Contains(err.Error(), "Permission denied") // Attempt to fetch the response with an invalid token invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", @@ -1337,14 +1337,14 @@ func TestJobEndpoint_Evaluate_ACL(t *testing.T) { reEval.AuthToken = invalidToken.SecretID var invalidResp structs.JobRegisterResponse err = msgpackrpc.CallWithCodec(codec, "Job.Evaluate", reEval, &invalidResp) - assert.NotNil(err) - assert.Contains(err.Error(), "Permission denied") + require.NotNil(err) + require.Contains(err.Error(), "Permission denied") // Fetch the response with a valid management token reEval.AuthToken = root.SecretID var validResp structs.JobRegisterResponse err = msgpackrpc.CallWithCodec(codec, "Job.Evaluate", reEval, &validResp) - assert.Nil(err) + require.Nil(err) // Fetch the response with a valid token validToken := mock.CreatePolicyAndToken(t, state, 1005, "test-valid", @@ -1353,21 +1353,21 @@ func TestJobEndpoint_Evaluate_ACL(t *testing.T) { reEval.AuthToken = validToken.SecretID var validResp2 structs.JobRegisterResponse err = msgpackrpc.CallWithCodec(codec, "Job.Evaluate", reEval, &validResp2) - assert.Nil(err) + require.Nil(err) // Lookup the evaluation ws := memdb.NewWatchSet() eval, err := state.EvalByID(ws, validResp2.EvalID) - assert.Nil(err) - assert.NotNil(eval) - - assert.Equal(eval.CreateIndex, validResp2.EvalCreateIndex) - assert.Equal(eval.Priority, job.Priority) - assert.Equal(eval.Type, job.Type) - assert.Equal(eval.TriggeredBy, structs.EvalTriggerJobRegister) - assert.Equal(eval.JobID, job.ID) - assert.Equal(eval.JobModifyIndex, validResp2.JobModifyIndex) - assert.Equal(eval.Status, structs.EvalStatusPending) + require.Nil(err) + require.NotNil(eval) + + require.Equal(eval.CreateIndex, validResp2.EvalCreateIndex) + require.Equal(eval.Priority, job.Priority) + require.Equal(eval.Type, job.Type) + require.Equal(eval.TriggeredBy, structs.EvalTriggerJobRegister) + require.Equal(eval.JobID, job.ID) + require.Equal(eval.JobModifyIndex, validResp2.JobModifyIndex) + require.Equal(eval.Status, structs.EvalStatusPending) } func TestJobEndpoint_Evaluate_Periodic(t *testing.T) { @@ -1460,6 +1460,7 @@ func TestJobEndpoint_Evaluate_ParameterizedJob(t *testing.T) { func TestJobEndpoint_Deregister(t *testing.T) { t.Parallel() + require := require.New(t) s1 := TestServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue }) @@ -1467,7 +1468,7 @@ func TestJobEndpoint_Deregister(t *testing.T) { codec := rpcClient(t, s1) testutil.WaitForLeader(t, s1.RPC) - // Create the register request + // Create the register requests job := mock.Job() reg := &structs.JobRegisterRequest{ Job: job, @@ -1479,9 +1480,7 @@ func TestJobEndpoint_Deregister(t *testing.T) { // Fetch the response var resp structs.JobRegisterResponse - if err := msgpackrpc.CallWithCodec(codec, "Job.Register", reg, &resp); err != nil { - t.Fatalf("err: %v", err) - } + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", reg, &resp)) // Deregister but don't purge dereg := &structs.JobDeregisterRequest{ @@ -1493,57 +1492,26 @@ func TestJobEndpoint_Deregister(t *testing.T) { }, } var resp2 structs.JobDeregisterResponse - if err := msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2); err != nil { - t.Fatalf("err: %v", err) - } - if resp2.Index == 0 { - t.Fatalf("bad index: %d", resp2.Index) - } + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg, &resp2)) + require.NotZero(resp2.Index) // Check for the job in the FSM - ws := memdb.NewWatchSet() state := s1.fsm.State() - out, err := state.JobByID(ws, job.Namespace, job.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if out == nil { - t.Fatalf("job purged") - } - if !out.Stop { - t.Fatalf("job not stopped") - } + out, err := state.JobByID(nil, job.Namespace, job.ID) + require.Nil(err) + require.NotNil(out) + require.True(out.Stop) // Lookup the evaluation - eval, err := state.EvalByID(ws, resp2.EvalID) - if err != nil { - t.Fatalf("err: %v", err) - } - if eval == nil { - t.Fatalf("expected eval") - } - if eval.CreateIndex != resp2.EvalCreateIndex { - t.Fatalf("index mis-match") - } - - if eval.Priority != structs.JobDefaultPriority { - t.Fatalf("bad: %#v", eval) - } - if eval.Type != structs.JobTypeService { - t.Fatalf("bad: %#v", eval) - } - if eval.TriggeredBy != structs.EvalTriggerJobDeregister { - t.Fatalf("bad: %#v", eval) - } - if eval.JobID != job.ID { - t.Fatalf("bad: %#v", eval) - } - if eval.JobModifyIndex != resp2.JobModifyIndex { - t.Fatalf("bad: %#v", eval) - } - if eval.Status != structs.EvalStatusPending { - t.Fatalf("bad: %#v", eval) - } + eval, err := state.EvalByID(nil, resp2.EvalID) + require.Nil(err) + require.NotNil(eval) + require.EqualValues(resp2.EvalCreateIndex, eval.CreateIndex) + require.Equal(job.Priority, eval.Priority) + require.Equal(job.Type, eval.Type) + require.Equal(structs.EvalTriggerJobDeregister, eval.TriggeredBy) + require.Equal(job.ID, eval.JobID) + require.Equal(structs.EvalStatusPending, eval.Status) // Deregister and purge dereg2 := &structs.JobDeregisterRequest{ @@ -1555,57 +1523,30 @@ func TestJobEndpoint_Deregister(t *testing.T) { }, } var resp3 structs.JobDeregisterResponse - if err := msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg2, &resp3); err != nil { - t.Fatalf("err: %v", err) - } - if resp3.Index == 0 { - t.Fatalf("bad index: %d", resp3.Index) - } + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Deregister", dereg2, &resp3)) + require.NotZero(resp3.Index) // Check for the job in the FSM - out, err = state.JobByID(ws, job.Namespace, job.ID) - if err != nil { - t.Fatalf("err: %v", err) - } - if out != nil { - t.Fatalf("unexpected job") - } + out, err = state.JobByID(nil, job.Namespace, job.ID) + require.Nil(err) + require.Nil(out) // Lookup the evaluation - eval, err = state.EvalByID(ws, resp3.EvalID) - if err != nil { - t.Fatalf("err: %v", err) - } - if eval == nil { - t.Fatalf("expected eval") - } - if eval.CreateIndex != resp3.EvalCreateIndex { - t.Fatalf("index mis-match") - } - - if eval.Priority != structs.JobDefaultPriority { - t.Fatalf("bad: %#v", eval) - } - if eval.Type != structs.JobTypeService { - t.Fatalf("bad: %#v", eval) - } - if eval.TriggeredBy != structs.EvalTriggerJobDeregister { - t.Fatalf("bad: %#v", eval) - } - if eval.JobID != job.ID { - t.Fatalf("bad: %#v", eval) - } - if eval.JobModifyIndex != resp3.JobModifyIndex { - t.Fatalf("bad: %#v", eval) - } - if eval.Status != structs.EvalStatusPending { - t.Fatalf("bad: %#v", eval) - } + eval, err = state.EvalByID(nil, resp3.EvalID) + require.Nil(err) + require.NotNil(eval) + + require.EqualValues(resp3.EvalCreateIndex, eval.CreateIndex) + require.Equal(job.Priority, eval.Priority) + require.Equal(job.Type, eval.Type) + require.Equal(structs.EvalTriggerJobDeregister, eval.TriggeredBy) + require.Equal(job.ID, eval.JobID) + require.Equal(structs.EvalStatusPending, eval.Status) } func TestJobEndpoint_Deregister_ACL(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) s1, root := TestACLServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue @@ -1632,8 +1573,8 @@ func TestJobEndpoint_Deregister_ACL(t *testing.T) { // Expect failure for request without a token var resp structs.JobDeregisterResponse err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", req, &resp) - assert.NotNil(err) - assert.Contains(err.Error(), "Permission denied") + require.NotNil(err) + require.Contains(err.Error(), "Permission denied") // Expect failure for request with an invalid token invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", @@ -1642,16 +1583,16 @@ func TestJobEndpoint_Deregister_ACL(t *testing.T) { var invalidResp structs.JobDeregisterResponse err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", req, &invalidResp) - assert.NotNil(err) - assert.Contains(err.Error(), "Permission denied") + require.NotNil(err) + require.Contains(err.Error(), "Permission denied") // Expect success with a valid management token req.AuthToken = root.SecretID var validResp structs.JobDeregisterResponse err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", req, &validResp) - assert.Nil(err) - assert.NotEqual(validResp.Index, 0) + require.Nil(err) + require.NotEqual(validResp.Index, 0) // Expect success with a valid token validToken := mock.CreatePolicyAndToken(t, state, 1005, "test-valid", @@ -1660,28 +1601,26 @@ func TestJobEndpoint_Deregister_ACL(t *testing.T) { var validResp2 structs.JobDeregisterResponse err = msgpackrpc.CallWithCodec(codec, "Job.Deregister", req, &validResp2) - assert.Nil(err) - assert.NotEqual(validResp2.Index, 0) - - ws := memdb.NewWatchSet() + require.Nil(err) + require.NotEqual(validResp2.Index, 0) // Check for the job in the FSM - out, err := state.JobByID(ws, job.Namespace, job.ID) - assert.Nil(err) - assert.Nil(out) + out, err := state.JobByID(nil, job.Namespace, job.ID) + require.Nil(err) + require.Nil(out) // Lookup the evaluation - eval, err := state.EvalByID(ws, validResp2.EvalID) - assert.Nil(err) - assert.NotNil(eval, nil) - - assert.Equal(eval.CreateIndex, validResp2.EvalCreateIndex) - assert.Equal(eval.Priority, structs.JobDefaultPriority) - assert.Equal(eval.Type, structs.JobTypeService) - assert.Equal(eval.TriggeredBy, structs.EvalTriggerJobDeregister) - assert.Equal(eval.JobID, job.ID) - assert.Equal(eval.JobModifyIndex, validResp2.JobModifyIndex) - assert.Equal(eval.Status, structs.EvalStatusPending) + eval, err := state.EvalByID(nil, validResp2.EvalID) + require.Nil(err) + require.NotNil(eval, nil) + + require.Equal(eval.CreateIndex, validResp2.EvalCreateIndex) + require.Equal(eval.Priority, structs.JobDefaultPriority) + require.Equal(eval.Type, structs.JobTypeService) + require.Equal(eval.TriggeredBy, structs.EvalTriggerJobDeregister) + require.Equal(eval.JobID, job.ID) + require.Equal(eval.JobModifyIndex, validResp2.JobModifyIndex) + require.Equal(eval.Status, structs.EvalStatusPending) } func TestJobEndpoint_Deregister_Nonexistent(t *testing.T) { @@ -1862,6 +1801,165 @@ func TestJobEndpoint_Deregister_ParameterizedJob(t *testing.T) { } } +func TestJobEndpoint_BatchDeregister(t *testing.T) { + t.Parallel() + require := require.New(t) + s1 := TestServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + + // Create the register requests + job := mock.Job() + reg := &structs.JobRegisterRequest{ + Job: job, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + + // Fetch the response + var resp structs.JobRegisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", reg, &resp)) + + job2 := mock.Job() + job2.Priority = 1 + reg2 := &structs.JobRegisterRequest{ + Job: job2, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job2.Namespace, + }, + } + + // Fetch the response + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Register", reg2, &resp)) + + // Deregister + dereg := &structs.JobBatchDeregisterRequest{ + Jobs: map[structs.NamespacedID]*structs.JobDeregisterOptions{ + structs.NamespacedID{ + ID: job.ID, + Namespace: job.Namespace, + }: &structs.JobDeregisterOptions{}, + structs.NamespacedID{ + ID: job2.ID, + Namespace: job2.Namespace, + }: &structs.JobDeregisterOptions{ + Purge: true, + }, + }, + WriteRequest: structs.WriteRequest{ + Region: "global", + Namespace: job.Namespace, + }, + } + var resp2 structs.JobBatchDeregisterResponse + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.BatchDeregister", dereg, &resp2)) + require.NotZero(resp2.Index) + + // Check for the job in the FSM + state := s1.fsm.State() + out, err := state.JobByID(nil, job.Namespace, job.ID) + require.Nil(err) + require.NotNil(out) + require.True(out.Stop) + + out, err = state.JobByID(nil, job2.Namespace, job2.ID) + require.Nil(err) + require.Nil(out) + + // Lookup the evaluation + for jobNS, eval := range resp2.JobEvals { + expectedJob := job + if jobNS.ID != job.ID { + expectedJob = job2 + } + + eval, err := state.EvalByID(nil, eval) + require.Nil(err) + require.NotNil(eval) + require.EqualValues(resp2.Index, eval.CreateIndex) + require.Equal(expectedJob.Priority, eval.Priority) + require.Equal(expectedJob.Type, eval.Type) + require.Equal(structs.EvalTriggerJobDeregister, eval.TriggeredBy) + require.Equal(expectedJob.ID, eval.JobID) + require.Equal(structs.EvalStatusPending, eval.Status) + } +} + +func TestJobEndpoint_BatchDeregister_ACL(t *testing.T) { + t.Parallel() + require := require.New(t) + + s1, root := TestACLServer(t, func(c *Config) { + c.NumSchedulers = 0 // Prevent automatic dequeue + }) + defer s1.Shutdown() + codec := rpcClient(t, s1) + testutil.WaitForLeader(t, s1.RPC) + state := s1.fsm.State() + + // Create and register a job + job, job2 := mock.Job(), mock.Job() + require.Nil(state.UpsertJob(100, job)) + require.Nil(state.UpsertJob(101, job2)) + + // Deregister + req := &structs.JobBatchDeregisterRequest{ + Jobs: map[structs.NamespacedID]*structs.JobDeregisterOptions{ + structs.NamespacedID{ + ID: job.ID, + Namespace: job.Namespace, + }: &structs.JobDeregisterOptions{}, + structs.NamespacedID{ + ID: job2.ID, + Namespace: job2.Namespace, + }: &structs.JobDeregisterOptions{}, + }, + WriteRequest: structs.WriteRequest{ + Region: "global", + }, + } + + // Expect failure for request without a token + var resp structs.JobBatchDeregisterResponse + err := msgpackrpc.CallWithCodec(codec, "Job.BatchDeregister", req, &resp) + require.NotNil(err) + require.True(structs.IsErrPermissionDenied(err)) + + // Expect failure for request with an invalid token + invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilityListJobs})) + req.AuthToken = invalidToken.SecretID + + var invalidResp structs.JobDeregisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.BatchDeregister", req, &invalidResp) + require.NotNil(err) + require.True(structs.IsErrPermissionDenied(err)) + + // Expect success with a valid management token + req.AuthToken = root.SecretID + + var validResp structs.JobDeregisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.BatchDeregister", req, &validResp) + require.Nil(err) + require.NotEqual(validResp.Index, 0) + + // Expect success with a valid token + validToken := mock.CreatePolicyAndToken(t, state, 1005, "test-valid", + mock.NamespacePolicy(structs.DefaultNamespace, "", []string{acl.NamespaceCapabilitySubmitJob})) + req.AuthToken = validToken.SecretID + + var validResp2 structs.JobDeregisterResponse + err = msgpackrpc.CallWithCodec(codec, "Job.BatchDeregister", req, &validResp2) + require.Nil(err) + require.NotEqual(validResp2.Index, 0) +} + func TestJobEndpoint_GetJob(t *testing.T) { t.Parallel() s1 := TestServer(t, nil) @@ -1941,7 +2039,7 @@ func TestJobEndpoint_GetJob(t *testing.T) { func TestJobEndpoint_GetJob_ACL(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) s1, root := TestACLServer(t, nil) defer s1.Shutdown() @@ -1952,7 +2050,7 @@ func TestJobEndpoint_GetJob_ACL(t *testing.T) { // Create the job job := mock.Job() err := state.UpsertJob(1000, job) - assert.Nil(err) + require.Nil(err) // Lookup the job get := &structs.JobSpecificRequest{ @@ -1966,8 +2064,8 @@ func TestJobEndpoint_GetJob_ACL(t *testing.T) { // Looking up the job without a token should fail var resp structs.SingleJobResponse err = msgpackrpc.CallWithCodec(codec, "Job.GetJob", get, &resp) - assert.NotNil(err) - assert.Contains(err.Error(), "Permission denied") + require.NotNil(err) + require.Contains(err.Error(), "Permission denied") // Expect failure for request with an invalid token invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", @@ -1976,15 +2074,15 @@ func TestJobEndpoint_GetJob_ACL(t *testing.T) { get.AuthToken = invalidToken.SecretID var invalidResp structs.SingleJobResponse err = msgpackrpc.CallWithCodec(codec, "Job.GetJob", get, &invalidResp) - assert.NotNil(err) - assert.Contains(err.Error(), "Permission denied") + require.NotNil(err) + require.Contains(err.Error(), "Permission denied") // Looking up the job with a management token should succeed get.AuthToken = root.SecretID var validResp structs.SingleJobResponse err = msgpackrpc.CallWithCodec(codec, "Job.GetJob", get, &validResp) - assert.Nil(err) - assert.Equal(job.ID, validResp.Job.ID) + require.Nil(err) + require.Equal(job.ID, validResp.Job.ID) // Looking up the job with a valid token should succeed validToken := mock.CreatePolicyAndToken(t, state, 1005, "test-valid", @@ -1993,8 +2091,8 @@ func TestJobEndpoint_GetJob_ACL(t *testing.T) { get.AuthToken = validToken.SecretID var validResp2 structs.SingleJobResponse err = msgpackrpc.CallWithCodec(codec, "Job.GetJob", get, &validResp2) - assert.Nil(err) - assert.Equal(job.ID, validResp2.Job.ID) + require.Nil(err) + require.Equal(job.ID, validResp2.Job.ID) } func TestJobEndpoint_GetJob_Blocking(t *testing.T) { @@ -2147,7 +2245,7 @@ func TestJobEndpoint_GetJobVersions(t *testing.T) { func TestJobEndpoint_GetJobVersions_ACL(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) s1, root := TestACLServer(t, nil) defer s1.Shutdown() @@ -2159,11 +2257,11 @@ func TestJobEndpoint_GetJobVersions_ACL(t *testing.T) { job := mock.Job() job.Priority = 88 err := state.UpsertJob(10, job) - assert.Nil(err) + require.Nil(err) job.Priority = 100 err = state.UpsertJob(100, job) - assert.Nil(err) + require.Nil(err) // Lookup the job get := &structs.JobVersionsRequest{ @@ -2177,8 +2275,8 @@ func TestJobEndpoint_GetJobVersions_ACL(t *testing.T) { // Attempt to fetch without a token should fail var resp structs.JobVersionsResponse err = msgpackrpc.CallWithCodec(codec, "Job.GetJobVersions", get, &resp) - assert.NotNil(err) - assert.Contains(err.Error(), "Permission denied") + require.NotNil(err) + require.Contains(err.Error(), "Permission denied") // Expect failure for request with an invalid token invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", @@ -2187,14 +2285,14 @@ func TestJobEndpoint_GetJobVersions_ACL(t *testing.T) { get.AuthToken = invalidToken.SecretID var invalidResp structs.JobVersionsResponse err = msgpackrpc.CallWithCodec(codec, "Job.GetJobVersions", get, &invalidResp) - assert.NotNil(err) - assert.Contains(err.Error(), "Permission denied") + require.NotNil(err) + require.Contains(err.Error(), "Permission denied") // Expect success for request with a valid management token get.AuthToken = root.SecretID var validResp structs.JobVersionsResponse err = msgpackrpc.CallWithCodec(codec, "Job.GetJobVersions", get, &validResp) - assert.Nil(err) + require.Nil(err) // Expect success for request with a valid token validToken := mock.CreatePolicyAndToken(t, state, 1005, "test-valid", @@ -2203,13 +2301,13 @@ func TestJobEndpoint_GetJobVersions_ACL(t *testing.T) { get.AuthToken = validToken.SecretID var validResp2 structs.JobVersionsResponse err = msgpackrpc.CallWithCodec(codec, "Job.GetJobVersions", get, &validResp2) - assert.Nil(err) + require.Nil(err) // Make sure there are two job versions versions := validResp2.Versions - assert.Equal(2, len(versions)) - assert.Equal(versions[0].ID, job.ID) - assert.Equal(versions[1].ID, job.ID) + require.Equal(2, len(versions)) + require.Equal(versions[0].ID, job.ID) + require.Equal(versions[1].ID, job.ID) } func TestJobEndpoint_GetJobVersions_Diff(t *testing.T) { @@ -2455,7 +2553,7 @@ func TestJobEndpoint_GetJobSummary(t *testing.T) { } func TestJobEndpoint_Summary_ACL(t *testing.T) { - assert := assert.New(t) + require := require.New(t) t.Parallel() srv, root := TestACLServer(t, func(c *Config) { @@ -2481,7 +2579,7 @@ func TestJobEndpoint_Summary_ACL(t *testing.T) { // Register the job with a valid token var regResp structs.JobRegisterResponse err = msgpackrpc.CallWithCodec(codec, "Job.Register", reg, ®Resp) - assert.Nil(err) + require.Nil(err) job.CreateIndex = regResp.JobModifyIndex job.ModifyIndex = regResp.JobModifyIndex @@ -2498,7 +2596,7 @@ func TestJobEndpoint_Summary_ACL(t *testing.T) { // Expect failure for request without a token var resp structs.JobSummaryResponse err = msgpackrpc.CallWithCodec(codec, "Job.Summary", req, &resp) - assert.NotNil(err) + require.NotNil(err) expectedJobSummary := &structs.JobSummary{ JobID: job.ID, @@ -2515,8 +2613,8 @@ func TestJobEndpoint_Summary_ACL(t *testing.T) { req.AuthToken = root.SecretID var mgmtResp structs.JobSummaryResponse err = msgpackrpc.CallWithCodec(codec, "Job.Summary", req, &mgmtResp) - assert.Nil(err) - assert.Equal(expectedJobSummary, mgmtResp.JobSummary) + require.Nil(err) + require.Equal(expectedJobSummary, mgmtResp.JobSummary) // Create the namespace policy and tokens state := srv.fsm.State() @@ -2528,7 +2626,7 @@ func TestJobEndpoint_Summary_ACL(t *testing.T) { req.AuthToken = invalidToken.SecretID var invalidResp structs.JobSummaryResponse err = msgpackrpc.CallWithCodec(codec, "Job.Summary", req, &invalidResp) - assert.NotNil(err) + require.NotNil(err) // Try with a valid token validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid", @@ -2537,8 +2635,8 @@ func TestJobEndpoint_Summary_ACL(t *testing.T) { req.AuthToken = validToken.SecretID var authResp structs.JobSummaryResponse err = msgpackrpc.CallWithCodec(codec, "Job.Summary", req, &authResp) - assert.Nil(err) - assert.Equal(expectedJobSummary, authResp.JobSummary) + require.Nil(err) + require.Equal(expectedJobSummary, authResp.JobSummary) } func TestJobEndpoint_GetJobSummary_Blocking(t *testing.T) { @@ -2696,7 +2794,7 @@ func TestJobEndpoint_ListJobs(t *testing.T) { } func TestJobEndpoint_ListJobs_WithACL(t *testing.T) { - assert := assert.New(t) + require := require.New(t) t.Parallel() srv, root := TestACLServer(t, func(c *Config) { @@ -2712,7 +2810,7 @@ func TestJobEndpoint_ListJobs_WithACL(t *testing.T) { // Create the register request job := mock.Job() err = state.UpsertJob(1000, job) - assert.Nil(err) + require.Nil(err) req := &structs.JobListRequest{ QueryOptions: structs.QueryOptions{ @@ -2724,15 +2822,15 @@ func TestJobEndpoint_ListJobs_WithACL(t *testing.T) { // Expect failure for request without a token var resp structs.JobListResponse err = msgpackrpc.CallWithCodec(codec, "Job.List", req, &resp) - assert.NotNil(err) + require.NotNil(err) // Expect success for request with a management token var mgmtResp structs.JobListResponse req.AuthToken = root.SecretID err = msgpackrpc.CallWithCodec(codec, "Job.List", req, &mgmtResp) - assert.Nil(err) - assert.Equal(1, len(mgmtResp.Jobs)) - assert.Equal(job.ID, mgmtResp.Jobs[0].ID) + require.Nil(err) + require.Equal(1, len(mgmtResp.Jobs)) + require.Equal(job.ID, mgmtResp.Jobs[0].ID) // Expect failure for request with a token that has incorrect permissions invalidToken := mock.CreatePolicyAndToken(t, state, 1003, "test-invalid", @@ -2741,7 +2839,7 @@ func TestJobEndpoint_ListJobs_WithACL(t *testing.T) { req.AuthToken = invalidToken.SecretID var invalidResp structs.JobListResponse err = msgpackrpc.CallWithCodec(codec, "Job.List", req, &invalidResp) - assert.NotNil(err) + require.NotNil(err) // Try with a valid token with correct permissions validToken := mock.CreatePolicyAndToken(t, state, 1001, "test-valid", @@ -2750,9 +2848,9 @@ func TestJobEndpoint_ListJobs_WithACL(t *testing.T) { req.AuthToken = validToken.SecretID err = msgpackrpc.CallWithCodec(codec, "Job.List", req, &validResp) - assert.Nil(err) - assert.Equal(1, len(validResp.Jobs)) - assert.Equal(job.ID, validResp.Jobs[0].ID) + require.Nil(err) + require.Equal(1, len(validResp.Jobs)) + require.Equal(job.ID, validResp.Jobs[0].ID) } func TestJobEndpoint_ListJobs_Blocking(t *testing.T) { @@ -2864,7 +2962,7 @@ func TestJobEndpoint_Allocations(t *testing.T) { func TestJobEndpoint_Allocations_ACL(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) s1, root := TestACLServer(t, nil) defer s1.Shutdown() @@ -2880,7 +2978,7 @@ func TestJobEndpoint_Allocations_ACL(t *testing.T) { state.UpsertJobSummary(999, mock.JobSummary(alloc2.JobID)) err := state.UpsertAllocs(1000, []*structs.Allocation{alloc1, alloc2}) - assert.Nil(err) + require.Nil(err) // Look up allocations for that job get := &structs.JobSpecificRequest{ @@ -2894,8 +2992,8 @@ func TestJobEndpoint_Allocations_ACL(t *testing.T) { // Attempt to fetch the response without a token should fail var resp structs.JobAllocationsResponse err = msgpackrpc.CallWithCodec(codec, "Job.Allocations", get, &resp) - assert.NotNil(err) - assert.Contains(err.Error(), "Permission denied") + require.NotNil(err) + require.Contains(err.Error(), "Permission denied") // Attempt to fetch the response with an invalid token should fail invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", @@ -2904,14 +3002,14 @@ func TestJobEndpoint_Allocations_ACL(t *testing.T) { get.AuthToken = invalidToken.SecretID var invalidResp structs.JobAllocationsResponse err = msgpackrpc.CallWithCodec(codec, "Job.Allocations", get, &invalidResp) - assert.NotNil(err) - assert.Contains(err.Error(), "Permission denied") + require.NotNil(err) + require.Contains(err.Error(), "Permission denied") // Attempt to fetch the response with valid management token should succeed get.AuthToken = root.SecretID var validResp structs.JobAllocationsResponse err = msgpackrpc.CallWithCodec(codec, "Job.Allocations", get, &validResp) - assert.Nil(err) + require.Nil(err) // Attempt to fetch the response with valid management token should succeed validToken := mock.CreatePolicyAndToken(t, state, 1005, "test-valid", @@ -2920,9 +3018,9 @@ func TestJobEndpoint_Allocations_ACL(t *testing.T) { get.AuthToken = validToken.SecretID var validResp2 structs.JobAllocationsResponse err = msgpackrpc.CallWithCodec(codec, "Job.Allocations", get, &validResp2) - assert.Nil(err) + require.Nil(err) - assert.Equal(2, len(validResp2.Allocations)) + require.Equal(2, len(validResp2.Allocations)) } func TestJobEndpoint_Allocations_Blocking(t *testing.T) { @@ -3023,7 +3121,7 @@ func TestJobEndpoint_Evaluations(t *testing.T) { func TestJobEndpoint_Evaluations_ACL(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) s1, root := TestACLServer(t, nil) defer s1.Shutdown() @@ -3037,7 +3135,7 @@ func TestJobEndpoint_Evaluations_ACL(t *testing.T) { eval2.JobID = eval1.JobID err := state.UpsertEvals(1000, []*structs.Evaluation{eval1, eval2}) - assert.Nil(err) + require.Nil(err) // Lookup the jobs get := &structs.JobSpecificRequest{ @@ -3051,8 +3149,8 @@ func TestJobEndpoint_Evaluations_ACL(t *testing.T) { // Attempt to fetch without providing a token var resp structs.JobEvaluationsResponse err = msgpackrpc.CallWithCodec(codec, "Job.Evaluations", get, &resp) - assert.NotNil(err) - assert.Contains(err.Error(), "Permission denied") + require.NotNil(err) + require.Contains(err.Error(), "Permission denied") // Attempt to fetch the response with an invalid token invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", @@ -3061,15 +3159,15 @@ func TestJobEndpoint_Evaluations_ACL(t *testing.T) { get.AuthToken = invalidToken.SecretID var invalidResp structs.JobEvaluationsResponse err = msgpackrpc.CallWithCodec(codec, "Job.Evaluations", get, &invalidResp) - assert.NotNil(err) - assert.Contains(err.Error(), "Permission denied") + require.NotNil(err) + require.Contains(err.Error(), "Permission denied") // Attempt to fetch with valid management token should succeed get.AuthToken = root.SecretID var validResp structs.JobEvaluationsResponse err = msgpackrpc.CallWithCodec(codec, "Job.Evaluations", get, &validResp) - assert.Nil(err) - assert.Equal(2, len(validResp.Evaluations)) + require.Nil(err) + require.Equal(2, len(validResp.Evaluations)) // Attempt to fetch with valid token should succeed validToken := mock.CreatePolicyAndToken(t, state, 1003, "test-valid", @@ -3078,8 +3176,8 @@ func TestJobEndpoint_Evaluations_ACL(t *testing.T) { get.AuthToken = validToken.SecretID var validResp2 structs.JobEvaluationsResponse err = msgpackrpc.CallWithCodec(codec, "Job.Evaluations", get, &validResp2) - assert.Nil(err) - assert.Equal(2, len(validResp2.Evaluations)) + require.Nil(err) + require.Equal(2, len(validResp2.Evaluations)) } func TestJobEndpoint_Evaluations_Blocking(t *testing.T) { @@ -3144,7 +3242,7 @@ func TestJobEndpoint_Deployments(t *testing.T) { codec := rpcClient(t, s1) testutil.WaitForLeader(t, s1.RPC) state := s1.fsm.State() - assert := assert.New(t) + require := require.New(t) // Create the register request j := mock.Job() @@ -3152,9 +3250,9 @@ func TestJobEndpoint_Deployments(t *testing.T) { d2 := mock.Deployment() d1.JobID = j.ID d2.JobID = j.ID - assert.Nil(state.UpsertJob(1000, j), "UpsertJob") - assert.Nil(state.UpsertDeployment(1001, d1), "UpsertDeployment") - assert.Nil(state.UpsertDeployment(1002, d2), "UpsertDeployment") + require.Nil(state.UpsertJob(1000, j), "UpsertJob") + require.Nil(state.UpsertDeployment(1001, d1), "UpsertDeployment") + require.Nil(state.UpsertDeployment(1002, d2), "UpsertDeployment") // Lookup the jobs get := &structs.JobSpecificRequest{ @@ -3165,14 +3263,14 @@ func TestJobEndpoint_Deployments(t *testing.T) { }, } var resp structs.DeploymentListResponse - assert.Nil(msgpackrpc.CallWithCodec(codec, "Job.Deployments", get, &resp), "RPC") - assert.EqualValues(1002, resp.Index, "response index") - assert.Len(resp.Deployments, 2, "deployments for job") + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Deployments", get, &resp), "RPC") + require.EqualValues(1002, resp.Index, "response index") + require.Len(resp.Deployments, 2, "deployments for job") } func TestJobEndpoint_Deployments_ACL(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) s1, root := TestACLServer(t, nil) defer s1.Shutdown() @@ -3186,9 +3284,9 @@ func TestJobEndpoint_Deployments_ACL(t *testing.T) { d2 := mock.Deployment() d1.JobID = j.ID d2.JobID = j.ID - assert.Nil(state.UpsertJob(1000, j), "UpsertJob") - assert.Nil(state.UpsertDeployment(1001, d1), "UpsertDeployment") - assert.Nil(state.UpsertDeployment(1002, d2), "UpsertDeployment") + require.Nil(state.UpsertJob(1000, j), "UpsertJob") + require.Nil(state.UpsertDeployment(1001, d1), "UpsertDeployment") + require.Nil(state.UpsertDeployment(1002, d2), "UpsertDeployment") // Lookup the jobs get := &structs.JobSpecificRequest{ @@ -3201,8 +3299,8 @@ func TestJobEndpoint_Deployments_ACL(t *testing.T) { // Lookup with no token should fail var resp structs.DeploymentListResponse err := msgpackrpc.CallWithCodec(codec, "Job.Deployments", get, &resp) - assert.NotNil(err) - assert.Contains(err.Error(), "Permission denied") + require.NotNil(err) + require.Contains(err.Error(), "Permission denied") // Attempt to fetch the response with an invalid token invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", @@ -3211,15 +3309,15 @@ func TestJobEndpoint_Deployments_ACL(t *testing.T) { get.AuthToken = invalidToken.SecretID var invalidResp structs.DeploymentListResponse err = msgpackrpc.CallWithCodec(codec, "Job.Deployments", get, &invalidResp) - assert.NotNil(err) - assert.Contains(err.Error(), "Permission denied") + require.NotNil(err) + require.Contains(err.Error(), "Permission denied") // Lookup with valid management token should succeed get.AuthToken = root.SecretID var validResp structs.DeploymentListResponse - assert.Nil(msgpackrpc.CallWithCodec(codec, "Job.Deployments", get, &validResp), "RPC") - assert.EqualValues(1002, validResp.Index, "response index") - assert.Len(validResp.Deployments, 2, "deployments for job") + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Deployments", get, &validResp), "RPC") + require.EqualValues(1002, validResp.Index, "response index") + require.Len(validResp.Deployments, 2, "deployments for job") // Lookup with valid token should succeed validToken := mock.CreatePolicyAndToken(t, state, 1005, "test-valid", @@ -3227,9 +3325,9 @@ func TestJobEndpoint_Deployments_ACL(t *testing.T) { get.AuthToken = validToken.SecretID var validResp2 structs.DeploymentListResponse - assert.Nil(msgpackrpc.CallWithCodec(codec, "Job.Deployments", get, &validResp2), "RPC") - assert.EqualValues(1002, validResp2.Index, "response index") - assert.Len(validResp2.Deployments, 2, "deployments for job") + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Deployments", get, &validResp2), "RPC") + require.EqualValues(1002, validResp2.Index, "response index") + require.Len(validResp2.Deployments, 2, "deployments for job") } func TestJobEndpoint_Deployments_Blocking(t *testing.T) { @@ -3239,23 +3337,23 @@ func TestJobEndpoint_Deployments_Blocking(t *testing.T) { codec := rpcClient(t, s1) testutil.WaitForLeader(t, s1.RPC) state := s1.fsm.State() - assert := assert.New(t) + require := require.New(t) // Create the register request j := mock.Job() d1 := mock.Deployment() d2 := mock.Deployment() d2.JobID = j.ID - assert.Nil(state.UpsertJob(50, j), "UpsertJob") + require.Nil(state.UpsertJob(50, j), "UpsertJob") // First upsert an unrelated eval time.AfterFunc(100*time.Millisecond, func() { - assert.Nil(state.UpsertDeployment(100, d1), "UpsertDeployment") + require.Nil(state.UpsertDeployment(100, d1), "UpsertDeployment") }) // Upsert an eval for the job we are interested in later time.AfterFunc(200*time.Millisecond, func() { - assert.Nil(state.UpsertDeployment(200, d2), "UpsertDeployment") + require.Nil(state.UpsertDeployment(200, d2), "UpsertDeployment") }) // Lookup the jobs @@ -3269,10 +3367,10 @@ func TestJobEndpoint_Deployments_Blocking(t *testing.T) { } var resp structs.DeploymentListResponse start := time.Now() - assert.Nil(msgpackrpc.CallWithCodec(codec, "Job.Deployments", get, &resp), "RPC") - assert.EqualValues(200, resp.Index, "response index") - assert.Len(resp.Deployments, 1, "deployments for job") - assert.Equal(d2.ID, resp.Deployments[0].ID, "returned deployment") + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.Deployments", get, &resp), "RPC") + require.EqualValues(200, resp.Index, "response index") + require.Len(resp.Deployments, 1, "deployments for job") + require.Equal(d2.ID, resp.Deployments[0].ID, "returned deployment") if elapsed := time.Since(start); elapsed < 200*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp) } @@ -3285,7 +3383,7 @@ func TestJobEndpoint_LatestDeployment(t *testing.T) { codec := rpcClient(t, s1) testutil.WaitForLeader(t, s1.RPC) state := s1.fsm.State() - assert := assert.New(t) + require := require.New(t) // Create the register request j := mock.Job() @@ -3295,9 +3393,9 @@ func TestJobEndpoint_LatestDeployment(t *testing.T) { d2.JobID = j.ID d2.CreateIndex = d1.CreateIndex + 100 d2.ModifyIndex = d2.CreateIndex + 100 - assert.Nil(state.UpsertJob(1000, j), "UpsertJob") - assert.Nil(state.UpsertDeployment(1001, d1), "UpsertDeployment") - assert.Nil(state.UpsertDeployment(1002, d2), "UpsertDeployment") + require.Nil(state.UpsertJob(1000, j), "UpsertJob") + require.Nil(state.UpsertDeployment(1001, d1), "UpsertDeployment") + require.Nil(state.UpsertDeployment(1002, d2), "UpsertDeployment") // Lookup the jobs get := &structs.JobSpecificRequest{ @@ -3308,15 +3406,15 @@ func TestJobEndpoint_LatestDeployment(t *testing.T) { }, } var resp structs.SingleDeploymentResponse - assert.Nil(msgpackrpc.CallWithCodec(codec, "Job.LatestDeployment", get, &resp), "RPC") - assert.EqualValues(1002, resp.Index, "response index") - assert.NotNil(resp.Deployment, "want a deployment") - assert.Equal(d2.ID, resp.Deployment.ID, "latest deployment for job") + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.LatestDeployment", get, &resp), "RPC") + require.EqualValues(1002, resp.Index, "response index") + require.NotNil(resp.Deployment, "want a deployment") + require.Equal(d2.ID, resp.Deployment.ID, "latest deployment for job") } func TestJobEndpoint_LatestDeployment_ACL(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) s1, root := TestACLServer(t, nil) defer s1.Shutdown() @@ -3332,9 +3430,9 @@ func TestJobEndpoint_LatestDeployment_ACL(t *testing.T) { d2.JobID = j.ID d2.CreateIndex = d1.CreateIndex + 100 d2.ModifyIndex = d2.CreateIndex + 100 - assert.Nil(state.UpsertJob(1000, j), "UpsertJob") - assert.Nil(state.UpsertDeployment(1001, d1), "UpsertDeployment") - assert.Nil(state.UpsertDeployment(1002, d2), "UpsertDeployment") + require.Nil(state.UpsertJob(1000, j), "UpsertJob") + require.Nil(state.UpsertDeployment(1001, d1), "UpsertDeployment") + require.Nil(state.UpsertDeployment(1002, d2), "UpsertDeployment") // Lookup the jobs get := &structs.JobSpecificRequest{ @@ -3348,8 +3446,8 @@ func TestJobEndpoint_LatestDeployment_ACL(t *testing.T) { // Attempt to fetch the response without a token should fail var resp structs.SingleDeploymentResponse err := msgpackrpc.CallWithCodec(codec, "Job.LatestDeployment", get, &resp) - assert.NotNil(err) - assert.Contains(err.Error(), "Permission denied") + require.NotNil(err) + require.Contains(err.Error(), "Permission denied") // Attempt to fetch the response with an invalid token should fail invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", @@ -3358,16 +3456,16 @@ func TestJobEndpoint_LatestDeployment_ACL(t *testing.T) { get.AuthToken = invalidToken.SecretID var invalidResp structs.SingleDeploymentResponse err = msgpackrpc.CallWithCodec(codec, "Job.LatestDeployment", get, &invalidResp) - assert.NotNil(err) - assert.Contains(err.Error(), "Permission denied") + require.NotNil(err) + require.Contains(err.Error(), "Permission denied") // Fetching latest deployment with a valid management token should succeed get.AuthToken = root.SecretID var validResp structs.SingleDeploymentResponse - assert.Nil(msgpackrpc.CallWithCodec(codec, "Job.LatestDeployment", get, &validResp), "RPC") - assert.EqualValues(1002, validResp.Index, "response index") - assert.NotNil(validResp.Deployment, "want a deployment") - assert.Equal(d2.ID, validResp.Deployment.ID, "latest deployment for job") + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.LatestDeployment", get, &validResp), "RPC") + require.EqualValues(1002, validResp.Index, "response index") + require.NotNil(validResp.Deployment, "want a deployment") + require.Equal(d2.ID, validResp.Deployment.ID, "latest deployment for job") // Fetching latest deployment with a valid token should succeed validToken := mock.CreatePolicyAndToken(t, state, 1004, "test-valid", @@ -3375,10 +3473,10 @@ func TestJobEndpoint_LatestDeployment_ACL(t *testing.T) { get.AuthToken = validToken.SecretID var validResp2 structs.SingleDeploymentResponse - assert.Nil(msgpackrpc.CallWithCodec(codec, "Job.LatestDeployment", get, &validResp2), "RPC") - assert.EqualValues(1002, validResp2.Index, "response index") - assert.NotNil(validResp2.Deployment, "want a deployment") - assert.Equal(d2.ID, validResp2.Deployment.ID, "latest deployment for job") + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.LatestDeployment", get, &validResp2), "RPC") + require.EqualValues(1002, validResp2.Index, "response index") + require.NotNil(validResp2.Deployment, "want a deployment") + require.Equal(d2.ID, validResp2.Deployment.ID, "latest deployment for job") } func TestJobEndpoint_LatestDeployment_Blocking(t *testing.T) { @@ -3388,23 +3486,23 @@ func TestJobEndpoint_LatestDeployment_Blocking(t *testing.T) { codec := rpcClient(t, s1) testutil.WaitForLeader(t, s1.RPC) state := s1.fsm.State() - assert := assert.New(t) + require := require.New(t) // Create the register request j := mock.Job() d1 := mock.Deployment() d2 := mock.Deployment() d2.JobID = j.ID - assert.Nil(state.UpsertJob(50, j), "UpsertJob") + require.Nil(state.UpsertJob(50, j), "UpsertJob") // First upsert an unrelated eval time.AfterFunc(100*time.Millisecond, func() { - assert.Nil(state.UpsertDeployment(100, d1), "UpsertDeployment") + require.Nil(state.UpsertDeployment(100, d1), "UpsertDeployment") }) // Upsert an eval for the job we are interested in later time.AfterFunc(200*time.Millisecond, func() { - assert.Nil(state.UpsertDeployment(200, d2), "UpsertDeployment") + require.Nil(state.UpsertDeployment(200, d2), "UpsertDeployment") }) // Lookup the jobs @@ -3418,10 +3516,10 @@ func TestJobEndpoint_LatestDeployment_Blocking(t *testing.T) { } var resp structs.SingleDeploymentResponse start := time.Now() - assert.Nil(msgpackrpc.CallWithCodec(codec, "Job.LatestDeployment", get, &resp), "RPC") - assert.EqualValues(200, resp.Index, "response index") - assert.NotNil(resp.Deployment, "deployment for job") - assert.Equal(d2.ID, resp.Deployment.ID, "returned deployment") + require.Nil(msgpackrpc.CallWithCodec(codec, "Job.LatestDeployment", get, &resp), "RPC") + require.EqualValues(200, resp.Index, "response index") + require.NotNil(resp.Deployment, "deployment for job") + require.Equal(d2.ID, resp.Deployment.ID, "returned deployment") if elapsed := time.Since(start); elapsed < 200*time.Millisecond { t.Fatalf("should block (returned in %s) %#v", elapsed, resp) } @@ -3749,7 +3847,7 @@ func TestJobEndpoint_ValidateJob_InvalidSignals(t *testing.T) { } func TestJobEndpoint_ValidateJob_KillSignal(t *testing.T) { - assert := assert.New(t) + require := require.New(t) t.Parallel() // test validate fails if the driver does not support sending signals, but a @@ -3760,9 +3858,9 @@ func TestJobEndpoint_ValidateJob_KillSignal(t *testing.T) { job.TaskGroups[0].Tasks[0].KillSignal = "SIGINT" err, warnings := validateJob(job) - assert.NotNil(err) - assert.True(strings.Contains(err.Error(), "support sending signals")) - assert.Nil(warnings) + require.NotNil(err) + require.True(strings.Contains(err.Error(), "support sending signals")) + require.Nil(warnings) } // test validate succeeds if the driver does support sending signals, and @@ -3772,8 +3870,8 @@ func TestJobEndpoint_ValidateJob_KillSignal(t *testing.T) { job.TaskGroups[0].Tasks[0].KillSignal = "SIGINT" err, warnings := validateJob(job) - assert.Nil(err) - assert.Nil(warnings) + require.Nil(err) + require.Nil(warnings) } } @@ -3812,7 +3910,7 @@ func TestJobEndpoint_ValidateJobUpdate(t *testing.T) { func TestJobEndpoint_ValidateJobUpdate_ACL(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) s1, root := TestACLServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue @@ -3834,21 +3932,21 @@ func TestJobEndpoint_ValidateJobUpdate_ACL(t *testing.T) { // Attempt to update without providing a valid token var resp structs.JobValidateResponse err := msgpackrpc.CallWithCodec(codec, "Job.Validate", req, &resp) - assert.NotNil(err) + require.NotNil(err) // Update with a valid token req.AuthToken = root.SecretID var validResp structs.JobValidateResponse err = msgpackrpc.CallWithCodec(codec, "Job.Validate", req, &validResp) - assert.Nil(err) + require.Nil(err) - assert.Equal("", validResp.Error) - assert.Equal("", validResp.Warnings) + require.Equal("", validResp.Error) + require.Equal("", validResp.Warnings) } func TestJobEndpoint_Dispatch_ACL(t *testing.T) { t.Parallel() - assert := assert.New(t) + require := require.New(t) s1, root := TestACLServer(t, func(c *Config) { c.NumSchedulers = 0 // Prevent automatic dequeue @@ -3864,7 +3962,7 @@ func TestJobEndpoint_Dispatch_ACL(t *testing.T) { job.Type = structs.JobTypeBatch job.ParameterizedJob = &structs.ParameterizedJobConfig{} err := state.UpsertJob(400, job) - assert.Nil(err) + require.Nil(err) req := &structs.JobDispatchRequest{ JobID: job.ID, @@ -3877,8 +3975,8 @@ func TestJobEndpoint_Dispatch_ACL(t *testing.T) { // Attempt to fetch the response without a token should fail var resp structs.JobDispatchResponse err = msgpackrpc.CallWithCodec(codec, "Job.Dispatch", req, &resp) - assert.NotNil(err) - assert.Contains(err.Error(), "Permission denied") + require.NotNil(err) + require.Contains(err.Error(), "Permission denied") // Attempt to fetch the response with an invalid token should fail invalidToken := mock.CreatePolicyAndToken(t, state, 1001, "test-invalid", @@ -3887,18 +3985,18 @@ func TestJobEndpoint_Dispatch_ACL(t *testing.T) { var invalidResp structs.JobDispatchResponse err = msgpackrpc.CallWithCodec(codec, "Job.Dispatch", req, &invalidResp) - assert.NotNil(err) - assert.Contains(err.Error(), "Permission denied") + require.NotNil(err) + require.Contains(err.Error(), "Permission denied") // Dispatch with a valid management token should succeed req.AuthToken = root.SecretID var validResp structs.JobDispatchResponse err = msgpackrpc.CallWithCodec(codec, "Job.Dispatch", req, &validResp) - assert.Nil(err) - assert.NotNil(validResp.EvalID) - assert.NotNil(validResp.DispatchedJobID) - assert.NotEqual(validResp.DispatchedJobID, "") + require.Nil(err) + require.NotNil(validResp.EvalID) + require.NotNil(validResp.DispatchedJobID) + require.NotEqual(validResp.DispatchedJobID, "") // Dispatch with a valid token should succeed validToken := mock.CreatePolicyAndToken(t, state, 1003, "test-valid", @@ -3907,22 +4005,22 @@ func TestJobEndpoint_Dispatch_ACL(t *testing.T) { var validResp2 structs.JobDispatchResponse err = msgpackrpc.CallWithCodec(codec, "Job.Dispatch", req, &validResp2) - assert.Nil(err) - assert.NotNil(validResp2.EvalID) - assert.NotNil(validResp2.DispatchedJobID) - assert.NotEqual(validResp2.DispatchedJobID, "") + require.Nil(err) + require.NotNil(validResp2.EvalID) + require.NotNil(validResp2.DispatchedJobID) + require.NotEqual(validResp2.DispatchedJobID, "") ws := memdb.NewWatchSet() out, err := state.JobByID(ws, job.Namespace, validResp2.DispatchedJobID) - assert.Nil(err) - assert.NotNil(out) - assert.Equal(out.ParentID, job.ID) + require.Nil(err) + require.NotNil(out) + require.Equal(out.ParentID, job.ID) // Look up the evaluation eval, err := state.EvalByID(ws, validResp2.EvalID) - assert.Nil(err) - assert.NotNil(eval) - assert.Equal(eval.CreateIndex, validResp2.EvalCreateIndex) + require.Nil(err) + require.NotNil(eval) + require.Equal(eval.CreateIndex, validResp2.EvalCreateIndex) } func TestJobEndpoint_Dispatch(t *testing.T) { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 2999026b618..0d6fd5c6fa8 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -77,6 +77,7 @@ const ( ACLTokenBootstrapRequestType AutopilotRequestType UpsertNodeEventsType + JobBatchDeregisterRequestType ) const ( @@ -379,6 +380,26 @@ type JobDeregisterRequest struct { WriteRequest } +// JobBatchDeregisterRequest is used to batch deregister jobs and upsert +// evaluations. +type JobBatchDeregisterRequest struct { + // Jobs is the set of jobs to deregister + Jobs map[NamespacedID]*JobDeregisterOptions + + // Evals is the set of evaluations to create. + Evals []*Evaluation + + WriteRequest +} + +// JobDeregisterOptions configures how a job is deregistered. +type JobDeregisterOptions struct { + // Purge controls whether the deregister purges the job from the system or + // whether the job is just marked as stopped and will be removed by the + // garbage collector + Purge bool +} + // JobEvaluateRequest is used when we just need to re-evaluate a target job type JobEvaluateRequest struct { JobID string @@ -787,6 +808,13 @@ type JobDeregisterResponse struct { QueryMeta } +// JobBatchDeregisterResponse is used to respond to a batch job deregistration +type JobBatchDeregisterResponse struct { + // JobEvals maps the job to its created evaluation + JobEvals map[NamespacedID]string + QueryMeta +} + // JobValidateResponse is the response from validate request type JobValidateResponse struct { // DriverConfigValidated indicates whether the agent validated the driver