Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

scale: add -check-index to job scale command #23457

Merged
merged 2 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/23457.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
scaling: Added `-check-index` support to `job scale` command
```
14 changes: 12 additions & 2 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ func (j *Jobs) Info(jobID string, q *QueryOptions) (*Job, *QueryMeta, error) {
return &resp, qm, nil
}

// Scale is used to retrieve information about a particular
// job given its unique ID.
// Scale is used to scale a job.
func (j *Jobs) Scale(jobID, group string, count *int, message string, error bool, meta map[string]interface{},
q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) {

Expand All @@ -242,6 +241,17 @@ func (j *Jobs) Scale(jobID, group string, count *int, message string, error bool
return &resp, qm, nil
}

// ScaleWithRequest is used to scale a job, giving the caller complete control
// over the ScalingRequest
func (j *Jobs) ScaleWithRequest(jobID string, req *ScalingRequest, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) {
var resp JobRegisterResponse
qm, err := j.client.put(fmt.Sprintf("/v1/job/%s/scale", url.PathEscape(jobID)), req, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, qm, nil
}

// ScaleStatus is used to retrieve information about a particular
// job given its unique ID.
func (j *Jobs) ScaleStatus(jobID string, q *QueryOptions) (*JobScaleStatusResponse, *QueryMeta, error) {
Expand Down
5 changes: 5 additions & 0 deletions api/scaling.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,13 @@ type ScalingRequest struct {
Error bool
Meta map[string]interface{}
WriteRequest

// this is effectively a job update, so we need the ability to override policy.
PolicyOverride bool

// If JobModifyIndex is set then the job will only be scaled if it matches
// the current Jobs index. The JobModifyIndex is ignored if 0.
JobModifyIndex uint64
}

// ScalingPolicy is the user-specified API object for an autoscaling policy
Expand Down
1 change: 1 addition & 0 deletions command/agent/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,7 @@ func (s *HTTPServer) jobScaleAction(resp http.ResponseWriter, req *http.Request,
Message: args.Message,
Error: args.Error,
Meta: args.Meta,
JobModifyIndex: args.JobModifyIndex,
}
// parseWriteRequest overrides Namespace, Region and AuthToken
// based on values from the original http request
Expand Down
26 changes: 23 additions & 3 deletions command/job_scale.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"time"

"github.com/hashicorp/nomad/api"
"github.com/hashicorp/nomad/helper/pointer"
"github.com/mitchellh/cli"
"github.com/posener/complete"
)
Expand Down Expand Up @@ -49,6 +50,11 @@ General Options:

Scale Options:

-check-index
If set, the job is only scaled if the passed job modify index matches the
server side version. Ignored if value of zero is passed. If a non-zero value
is passed, it ensures that the job is being updated from a known state.

-detach
Return immediately instead of entering monitor mode. After job scaling,
the evaluation ID will be printed to the screen, which can be used to
Expand All @@ -68,8 +74,9 @@ func (j *JobScaleCommand) Synopsis() string {
func (j *JobScaleCommand) AutocompleteFlags() complete.Flags {
return mergeAutocompleteFlags(j.Meta.AutocompleteFlags(FlagSetClient),
complete.Flags{
"-detach": complete.PredictNothing,
"-verbose": complete.PredictNothing,
"-check-index": complete.PredictNothing,
"-detach": complete.PredictNothing,
"-verbose": complete.PredictNothing,
})
}

Expand All @@ -79,9 +86,11 @@ func (j *JobScaleCommand) Name() string { return "job scale" }
// Run satisfies the cli.Command Run function.
func (j *JobScaleCommand) Run(args []string) int {
var detach, verbose bool
var checkIndex uint64

flags := j.Meta.FlagSet(j.Name(), FlagSetClient)
flags.Usage = func() { j.Ui.Output(j.Help()) }
flags.Uint64Var(&checkIndex, "check-index", 0, "")
flags.BoolVar(&detach, "detach", false, "")
flags.BoolVar(&verbose, "verbose", false, "")
if err := flags.Parse(args); err != nil {
Expand Down Expand Up @@ -144,7 +153,18 @@ func (j *JobScaleCommand) Run(args []string) int {

// Perform the scaling action.
w := &api.WriteOptions{Namespace: namespace}
resp, _, err := client.Jobs().Scale(jobID, groupString, &count, msg, false, nil, w)
req := &api.ScalingRequest{
Count: pointer.Of(int64(count)),
Target: map[string]string{
"Job": jobID,
"Group": groupString,
},
Message: msg,
PolicyOverride: false,
JobModifyIndex: checkIndex,
}

resp, _, err := client.Jobs().ScaleWithRequest(jobID, req, w)
if err != nil {
j.Ui.Error(fmt.Sprintf("Error submitting scaling request: %s", err))
return 1
Expand Down
8 changes: 8 additions & 0 deletions nomad/job_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -1094,6 +1094,14 @@ func (j *Job) Scale(args *structs.JobScaleRequest, reply *structs.JobRegisterRes
return structs.NewErrRPCCoded(400, "job scaling blocked due to active deployment")
}

// If JobModifyIndex set, check it before trying to apply
if args.JobModifyIndex > 0 {
if args.JobModifyIndex != job.JobModifyIndex {
return fmt.Errorf("%s %d: job exists with conflicting job modify index: %d",
RegisterEnforceIndexErrPrefix, args.JobModifyIndex, job.JobModifyIndex)
}
}

// Commit the job update
_, jobModifyIndex, err := j.srv.raftApply(
structs.JobRegisterRequestType,
Expand Down
46 changes: 46 additions & 0 deletions nomad/job_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7749,6 +7749,52 @@ func TestJobEndpoint_Scale_DeploymentBlocking(t *testing.T) {
}
}

func TestJobEndpoint_ScaleEnforceIndex(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, nil)
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)
store := s1.fsm.State()

job := mock.Job()
originalCount := job.TaskGroups[0].Count
err := store.UpsertJob(structs.MsgTypeTestSetup, 1000, nil, job)
must.NoError(t, err)

groupName := job.TaskGroups[0].Name
scale := &structs.JobScaleRequest{
JobID: job.ID,
Target: map[string]string{
structs.ScalingTargetGroup: groupName,
},
Count: pointer.Of(int64(originalCount + 1)),
Message: "because of the load",
Meta: map[string]interface{}{
"metrics": map[string]string{
"1": "a",
"2": "b",
},
"other": "value",
},
PolicyOverride: false,
EnforceIndex: true,
JobModifyIndex: 1000000,
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
},
}
var resp structs.JobRegisterResponse
err = msgpackrpc.CallWithCodec(codec, "Job.Scale", scale, &resp)
must.EqError(t, err,
"Enforcing job modify index 1000000: job exists with conflicting job modify index: 1000")

events, _, _ := store.ScalingEventsByJob(nil, job.Namespace, job.ID)
must.Len(t, 0, events[groupName])
}

func TestJobEndpoint_Scale_InformationalEventsShouldNotBeBlocked(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
Expand Down
8 changes: 8 additions & 0 deletions nomad/structs/structs.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,8 +859,16 @@ type JobScaleRequest struct {
Message string
Error bool
Meta map[string]interface{}

// PolicyOverride is set when the user is attempting to override any policies
PolicyOverride bool

// If EnforceIndex is set then the job will only be scaled if the passed
// JobModifyIndex matches the current Jobs index. If the index is zero,
// EnforceIndex is ignored.
EnforceIndex bool
JobModifyIndex uint64

WriteRequest
}

Expand Down
35 changes: 23 additions & 12 deletions website/content/api-docs/jobs.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -2389,24 +2389,35 @@ The table below shows this endpoint's support for

- `Count` `(int: <optional>)` - Specifies the new task group count.

- `Target` `(json: required)` - JSON map containing the target of the scaling operation.
Must contain a field `Group` with the name of the task group that is the target of this scaling action.
- `EnforceIndex` `(bool: false)` - If set, the job will only be registered if
the passed `JobModifyIndex` matches the current job's index. If the index is
zero, the register only occurs if the job is new. This paradigm allows
check-and-set style job updating.

- `Message` `(string: <optional>)` - Description of the scale action, persisted as part of the scaling event.
Indicates information or reason for scaling; one of `Message` or `Error` must be provided.
- `Error` `(string: <optional>)` - Description of the scale action, persisted as
part of the scaling event. Indicates an error state preventing scaling; one
of `Message` or `Error` must be provided.

- `Error` `(string: <optional>)` - Description of the scale action, persisted as part of the scaling event.
Indicates an error state preventing scaling; one of `Message` or `Error` must be provided.
- `JobModifyIndex` `(int: 0)` - Specifies the `JobModifyIndex` to enforce the
current job is at.

- `Meta` `(json: <optional>)` - JSON block that is persisted as part of the scaling event.
- `Message` `(string: <optional>)` - Description of the scale action, persisted
as part of the scaling event. Indicates information or reason for scaling;
one of `Message` or `Error` must be provided.

- `PolicyOverride` `(bool: false)` - If set, any soft mandatory Sentinel policies
will be overridden. This allows a job to be scaled when it would be denied
by policy.
- `Meta` `(json: <optional>)` - JSON block that is persisted as part of the scaling event.

- `namespace` `(string: "default")` - Specifies the target namespace. If ACL is
enabled, this value must match a namespace that the token is allowed to
access. This is specified as a query string parameter.
enabled, this value must match a namespace that the token is allowed to
access. This is specified as a query string parameter.

- `PolicyOverride` `(bool: false)` - If set, any soft mandatory Sentinel
policies will be overridden. This allows a job to be scaled when it would be
denied by policy.

- `Target` `(json: required)` - JSON map containing the target of the scaling
operation. Must contain a field `Group` with the name of the task group that
is the target of this scaling action.

### Sample Payload

Expand Down
5 changes: 5 additions & 0 deletions website/content/docs/commands/job/scale.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ not used.

## Scale Options

- `-check-index`: If set, the job is only scaled if the passed job modify index
matches the server side version. Ignored if value of zero is passed. If a
non-zero value is passed, it ensures that the job is being updated from a
known state.

- `-detach`: Return immediately instead of entering monitor mode. After the
scale command is submitted, a new evaluation ID is printed to the screen,
which can be used to examine the evaluation using the [eval status] command.
Expand Down
Loading