Skip to content

Commit

Permalink
scale: add -check-index to job scale command
Browse files Browse the repository at this point in the history
The RPC handler for scaling a job passes flags to enforce the job modify index
is unchanged when it makes the write to Raft. But its only checking against the
existing job modify index at the time the RPC handler snapshots the state store,
so it can only enforce consistency for its own validation.

In clusters with automated scaling, it would be useful to expose the enforce
index options to the API, so that cluster admins can enforce that scaling only
happens when the job state is consistent with a state they've previously seen in
other API calls. Add this option to the CLI and API and have the RPC handler
check them if asked.

Fixes: #23444
  • Loading branch information
tgross committed Jun 27, 2024
1 parent 863d42b commit 239b0d8
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 17 deletions.
3 changes: 3 additions & 0 deletions .changelog/23457.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
scaling: Added `-check-index` support to `job scale` command
```
14 changes: 12 additions & 2 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ func (j *Jobs) Info(jobID string, q *QueryOptions) (*Job, *QueryMeta, error) {
return &resp, qm, nil
}

// Scale is used to retrieve information about a particular
// job given its unique ID.
// Scale is used to scale a job.
func (j *Jobs) Scale(jobID, group string, count *int, message string, error bool, meta map[string]interface{},
q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) {

Expand All @@ -242,6 +241,17 @@ func (j *Jobs) Scale(jobID, group string, count *int, message string, error bool
return &resp, qm, nil
}

// ScaleWithRequest is used to scale a job, giving the caller complete control
// over the ScalingRequest
func (j *Jobs) ScaleWithRequest(jobID string, req *ScalingRequest, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) {
var resp JobRegisterResponse
qm, err := j.client.put(fmt.Sprintf("/v1/job/%s/scale", url.PathEscape(jobID)), req, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, qm, nil
}

// ScaleStatus is used to retrieve information about a particular
// job given its unique ID.
func (j *Jobs) ScaleStatus(jobID string, q *QueryOptions) (*JobScaleStatusResponse, *QueryMeta, error) {
Expand Down
7 changes: 7 additions & 0 deletions api/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,15 @@ type ScalingRequest struct {
Error bool
Meta map[string]interface{}
WriteRequest

// this is effectively a job update, so we need the ability to override policy.
PolicyOverride bool

// If EnforceIndex is set then the job will only be scaled if the passed
// JobModifyIndex matches the current Jobs index. If the index is zero,
// EnforceIndex is ignored.
EnforceIndex bool
JobModifyIndex uint64
}

// ScalingPolicy is the user-specified API object for an autoscaling policy
Expand Down
2 changes: 2 additions & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,8 @@ func (s *HTTPServer) jobScaleAction(resp http.ResponseWriter, req *http.Request,
Message: args.Message,
Error: args.Error,
Meta: args.Meta,
EnforceIndex: args.EnforceIndex,
JobModifyIndex: args.JobModifyIndex,
}
// parseWriteRequest overrides Namespace, Region and AuthToken
// based on values from the original http request
Expand Down
27 changes: 24 additions & 3 deletions command/job_scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/mitchellh/cli"
"github.com/posener/complete"
)
Expand Down Expand Up @@ -49,6 +50,11 @@ General Options:
Scale Options:
-check-index
If set, the job is only scaled if the passed job modify index matches the
server side version. Ignored if value of zero is passed. If a non-zero value
is passed, it ensures that the job is being updated from a known state.
-detach
Return immediately instead of entering monitor mode. After job scaling,
the evaluation ID will be printed to the screen, which can be used to
Expand All @@ -68,8 +74,9 @@ func (j *JobScaleCommand) Synopsis() string {
func (j *JobScaleCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(j.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-detach": complete.PredictNothing,
"-verbose": complete.PredictNothing,
"-check-index": complete.PredictNothing,
"-detach": complete.PredictNothing,
"-verbose": complete.PredictNothing,
})
}

Expand All @@ -79,9 +86,11 @@ func (j *JobScaleCommand) Name() string { return "job scale" }
// Run satisfies the cli.Command Run function.
func (j *JobScaleCommand) Run(args []string) int {
var detach, verbose bool
var checkIndex uint64

flags := j.Meta.FlagSet(j.Name(), FlagSetClient)
flags.Usage = func() { j.Ui.Output(j.Help()) }
flags.Uint64Var(&checkIndex, "check-index", 0, "")
flags.BoolVar(&detach, "detach", false, "")
flags.BoolVar(&verbose, "verbose", false, "")
if err := flags.Parse(args); err != nil {
Expand Down Expand Up @@ -144,7 +153,19 @@ func (j *JobScaleCommand) Run(args []string) int {

// Perform the scaling action.
w := &api.WriteOptions{Namespace: namespace}
resp, _, err := client.Jobs().Scale(jobID, groupString, &count, msg, false, nil, w)
req := &api.ScalingRequest{
Count: pointer.Of(int64(count)),
Target: map[string]string{
"Job": jobID,
"Group": groupString,
},
Message: msg,
PolicyOverride: false,
EnforceIndex: checkIndex > 0,
JobModifyIndex: checkIndex,
}

resp, _, err := client.Jobs().ScaleWithRequest(jobID, req, w)
if err != nil {
j.Ui.Error(fmt.Sprintf("Error submitting scaling request: %s", err))
return 1
Expand Down
8 changes: 8 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,14 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
return structs.NewErrRPCCoded(400, "job scaling blocked due to active deployment")
}

// If EnforceIndex set, check it before trying to apply
if args.EnforceIndex && args.JobModifyIndex != 0 {
if args.JobModifyIndex != job.JobModifyIndex {
return fmt.Errorf("%s %d: job exists with conflicting job modify index: %d",
RegisterEnforceIndexErrPrefix, args.JobModifyIndex, job.JobModifyIndex)
}
}

// Commit the job update
_, jobModifyIndex, err := j.srv.raftApply(
structs.JobRegisterRequestType,
Expand Down
46 changes: 46 additions & 0 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7749,6 +7749,52 @@ func TestJobEndpoint_Scale_DeploymentBlocking(t *testing.T) {
}
}

func TestJobEndpoint_ScaleEnforceIndex(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
store := s1.fsm.State()

job := mock.Job()
originalCount := job.TaskGroups[0].Count
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, nil, job)
must.NoError(t, err)

groupName := job.TaskGroups[0].Name
scale := &structs.JobScaleRequest{
JobID: job.ID,
Target: map[string]string{
structs.ScalingTargetGroup: groupName,
},
Count: pointer.Of(int64(originalCount + 1)),
Message: "because of the load",
Meta: map[string]interface{}{
"metrics": map[string]string{
"1": "a",
"2": "b",
},
"other": "value",
},
PolicyOverride: false,
EnforceIndex: true,
JobModifyIndex: 1000000,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
var resp structs.JobRegisterResponse
err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp)
must.EqError(t, err,
"Enforcing job modify index 1000000: job exists with conflicting job modify index: 1000")

events, _, _ := store.ScalingEventsByJob(nil, job.Namespace, job.ID)
must.Len(t, 0, events[groupName])
}

func TestJobEndpoint_Scale_InformationalEventsShouldNotBeBlocked(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
Expand Down
8 changes: 8 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,8 +859,16 @@ type JobScaleRequest struct {
Message string
Error bool
Meta map[string]interface{}

// PolicyOverride is set when the user is attempting to override any policies
PolicyOverride bool

// If EnforceIndex is set then the job will only be scaled if the passed
// JobModifyIndex matches the current Jobs index. If the index is zero,
// EnforceIndex is ignored.
EnforceIndex bool
JobModifyIndex uint64

WriteRequest
}

Expand Down
35 changes: 23 additions & 12 deletions website/content/api-docs/jobs.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -2389,24 +2389,35 @@ The table below shows this endpoint's support for

- `Count` `(int: <optional>)` - Specifies the new task group count.

- `Target` `(json: required)` - JSON map containing the target of the scaling operation.
Must contain a field `Group` with the name of the task group that is the target of this scaling action.
- `EnforceIndex` `(bool: false)` - If set, the job will only be registered if
the passed `JobModifyIndex` matches the current job's index. If the index is
zero, the register only occurs if the job is new. This paradigm allows
check-and-set style job updating.

- `Message` `(string: <optional>)` - Description of the scale action, persisted as part of the scaling event.
Indicates information or reason for scaling; one of `Message` or `Error` must be provided.
- `Error` `(string: <optional>)` - Description of the scale action, persisted as
part of the scaling event. Indicates an error state preventing scaling; one
of `Message` or `Error` must be provided.

- `Error` `(string: <optional>)` - Description of the scale action, persisted as part of the scaling event.
Indicates an error state preventing scaling; one of `Message` or `Error` must be provided.
- `JobModifyIndex` `(int: 0)` - Specifies the `JobModifyIndex` to enforce the
current job is at.

- `Meta` `(json: <optional>)` - JSON block that is persisted as part of the scaling event.
- `Message` `(string: <optional>)` - Description of the scale action, persisted
as part of the scaling event. Indicates information or reason for scaling;
one of `Message` or `Error` must be provided.

- `PolicyOverride` `(bool: false)` - If set, any soft mandatory Sentinel policies
will be overridden. This allows a job to be scaled when it would be denied
by policy.
- `Meta` `(json: <optional>)` - JSON block that is persisted as part of the scaling event.

- `namespace` `(string: "default")` - Specifies the target namespace. If ACL is
enabled, this value must match a namespace that the token is allowed to
access. This is specified as a query string parameter.
enabled, this value must match a namespace that the token is allowed to
access. This is specified as a query string parameter.

- `PolicyOverride` `(bool: false)` - If set, any soft mandatory Sentinel
policies will be overridden. This allows a job to be scaled when it would be
denied by policy.

- `Target` `(json: required)` - JSON map containing the target of the scaling
operation. Must contain a field `Group` with the name of the task group that
is the target of this scaling action.

### Sample Payload

Expand Down
5 changes: 5 additions & 0 deletions website/content/docs/commands/job/scale.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ not used.

## Scale Options

- `-check-index`: If set, the job is only scaled if the passed job modify index
matches the server side version. Ignored if value of zero is passed. If a
non-zero value is passed, it ensures that the job is being updated from a
known state.

- `-detach`: Return immediately instead of entering monitor mode. After the
scale command is submitted, a new evaluation ID is printed to the screen,
which can be used to examine the evaluation using the [eval status] command.
Expand Down

0 comments on commit 239b0d8

Please sign in to comment.