Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scaling API changes #7409

Merged
merged 30 commits into from
Mar 24, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ee1b091
WIP: adding ScalingPolicy to api/structs and state store
cgbaker Jan 15, 2020
0762386
wip: upsert/delete scaling policies on job upsert/delete
cgbaker Jan 16, 2020
e6d75ca
wip: was incorrectly parsing ScalingPolicy
cgbaker Jan 16, 2020
7ba9b94
wip: test for scaling policy parsing
cgbaker Jan 16, 2020
a715eb7
wip: added policy get endpoint, added UUID to policy
cgbaker Jan 16, 2020
8102849
wip: working on job group scaling endpoint
cgbaker Jan 16, 2020
1c9bac9
wip: added job.scale rpc endpoint, needs explicit test (tested via ht…
cgbaker Jan 17, 2020
ef7cb0e
wip: add job scale endpoint in client
lgfa29 Jan 25, 2020
4406668
wip: add GET endpoint for job group scaling target
cgbaker Jan 26, 2020
94381c0
wip: added tests for client methods around group scaling
cgbaker Jan 27, 2020
16472c0
wip: remove PolicyOverride from scaling request
cgbaker Feb 3, 2020
6b9c004
wip: added Enabled to ScalingPolicyListStub, removed JobID from body …
cgbaker Feb 5, 2020
7544aaa
wip: add scaling policies methods to the client
lgfa29 Jan 30, 2020
836acc1
wip: add tests for job scale method
lgfa29 Jan 30, 2020
9756c64
wip: use testify in job scaling tests
lgfa29 Feb 4, 2020
c095f41
wip: removed some commented junk from scaling poc
cgbaker Feb 5, 2020
3b4a1ae
finished refactoring state store, schema, etc
cgbaker Feb 21, 2020
66bf8dd
wip: some tests still failing
cgbaker Mar 18, 2020
9243718
scaling: ensure min and max int64s are in toplevel of block.
jrasell Mar 19, 2020
03eb96a
wip: scaling status return, almost done
cgbaker Mar 20, 2020
4759bc3
finished Job.ScaleStatus RPC, need to work on http endpoint
cgbaker Mar 21, 2020
2d88d57
fixed http endpoints for job.register and job.scalestatus
cgbaker Mar 21, 2020
4156009
scaling api: put api.* objects in agreement with structs.* objects
cgbaker Mar 21, 2020
9292e88
changes to Canonicalize, Validate, and api->struct conversion so that…
cgbaker Mar 22, 2020
f704a49
added new ACL capabilities related to autoscaling:
cgbaker Mar 22, 2020
ac5a166
wip: ACL checking for RPC Job.ScaleStatus
cgbaker Mar 22, 2020
d4f967c
made count optional during job scaling actions
cgbaker Mar 22, 2020
ff652bf
add acl validation to Scaling.ListPolicies and Scaling.GetPolicy
cgbaker Mar 23, 2020
e831ec3
added new int64ToPtr method to api/util to avoid pulling in other pac…
cgbaker Mar 23, 2020
4330fe8
bad conversion between api.ScalingPolicy and structs.ScalingPolicy meant
cgbaker Mar 23, 2020
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 39 additions & 22 deletions acl/policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,11 @@ const (
// The following levels are the only valid values for the `policy = "read"` stanza.
// When policies are merged together, the most privilege is granted, except for deny
// which always takes precedence and supercedes.
PolicyDeny = "deny"
PolicyRead = "read"
PolicyList = "list"
PolicyWrite = "write"
PolicyDeny = "deny"
PolicyRead = "read"
PolicyList = "list"
PolicyWrite = "write"
PolicyAutoscaler = "autoscaler"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A bit of bikeshedding here, but the other policies are named as verbs, so should this be

PolicyAutoscale = "autoscale"

?

)

const (
Expand All @@ -23,22 +24,26 @@ const (
// combined we take the union of all capabilities. If the deny capability is present, it
// takes precedence and overwrites all other capabilities.

NamespaceCapabilityDeny = "deny"
NamespaceCapabilityListJobs = "list-jobs"
NamespaceCapabilityReadJob = "read-job"
NamespaceCapabilitySubmitJob = "submit-job"
NamespaceCapabilityDispatchJob = "dispatch-job"
NamespaceCapabilityReadLogs = "read-logs"
NamespaceCapabilityReadFS = "read-fs"
NamespaceCapabilityAllocExec = "alloc-exec"
NamespaceCapabilityAllocNodeExec = "alloc-node-exec"
NamespaceCapabilityAllocLifecycle = "alloc-lifecycle"
NamespaceCapabilitySentinelOverride = "sentinel-override"
NamespaceCapabilityCSIRegisterPlugin = "csi-register-plugin"
NamespaceCapabilityCSIWriteVolume = "csi-write-volume"
NamespaceCapabilityCSIReadVolume = "csi-read-volume"
NamespaceCapabilityCSIListVolume = "csi-list-volume"
NamespaceCapabilityCSIMountVolume = "csi-mount-volume"
NamespaceCapabilityDeny = "deny"
NamespaceCapabilityListJobs = "list-jobs"
NamespaceCapabilityReadJob = "read-job"
NamespaceCapabilitySubmitJob = "submit-job"
NamespaceCapabilityDispatchJob = "dispatch-job"
NamespaceCapabilityReadLogs = "read-logs"
NamespaceCapabilityReadFS = "read-fs"
NamespaceCapabilityAllocExec = "alloc-exec"
NamespaceCapabilityAllocNodeExec = "alloc-node-exec"
NamespaceCapabilityAllocLifecycle = "alloc-lifecycle"
NamespaceCapabilitySentinelOverride = "sentinel-override"
NamespaceCapabilityCSIRegisterPlugin = "csi-register-plugin"
NamespaceCapabilityCSIWriteVolume = "csi-write-volume"
NamespaceCapabilityCSIReadVolume = "csi-read-volume"
NamespaceCapabilityCSIListVolume = "csi-list-volume"
NamespaceCapabilityCSIMountVolume = "csi-mount-volume"
NamespaceCapabilityListScalingPolicies = "list-scaling-policies"
NamespaceCapabilityReadScalingPolicy = "read-scaling-policy"
NamespaceCapabilityReadJobScaling = "read-job-scaling"
NamespaceCapabilityScaleJob = "scale-job"
)

var (
Expand Down Expand Up @@ -121,7 +126,7 @@ type PluginPolicy struct {
// isPolicyValid makes sure the given string matches one of the valid policies.
func isPolicyValid(policy string) bool {
switch policy {
case PolicyDeny, PolicyRead, PolicyWrite:
case PolicyDeny, PolicyRead, PolicyWrite, PolicyAutoscaler:
return true
default:
return false
Expand All @@ -144,7 +149,8 @@ func isNamespaceCapabilityValid(cap string) bool {
NamespaceCapabilitySubmitJob, NamespaceCapabilityDispatchJob, NamespaceCapabilityReadLogs,
NamespaceCapabilityReadFS, NamespaceCapabilityAllocLifecycle,
NamespaceCapabilityAllocExec, NamespaceCapabilityAllocNodeExec,
NamespaceCapabilityCSIReadVolume, NamespaceCapabilityCSIWriteVolume, NamespaceCapabilityCSIListVolume, NamespaceCapabilityCSIMountVolume, NamespaceCapabilityCSIRegisterPlugin:
NamespaceCapabilityCSIReadVolume, NamespaceCapabilityCSIWriteVolume, NamespaceCapabilityCSIListVolume, NamespaceCapabilityCSIMountVolume, NamespaceCapabilityCSIRegisterPlugin,
NamespaceCapabilityListScalingPolicies, NamespaceCapabilityReadScalingPolicy, NamespaceCapabilityReadJobScaling, NamespaceCapabilityScaleJob:
return true
// Separate the enterprise-only capabilities
case NamespaceCapabilitySentinelOverride:
Expand All @@ -162,9 +168,13 @@ func expandNamespacePolicy(policy string) []string {
NamespaceCapabilityReadJob,
NamespaceCapabilityCSIListVolume,
NamespaceCapabilityCSIReadVolume,
NamespaceCapabilityReadJobScaling,
NamespaceCapabilityListScalingPolicies,
NamespaceCapabilityReadScalingPolicy,
}

write := append(read, []string{
NamespaceCapabilityScaleJob,
NamespaceCapabilitySubmitJob,
NamespaceCapabilityDispatchJob,
NamespaceCapabilityReadLogs,
Expand All @@ -182,6 +192,13 @@ func expandNamespacePolicy(policy string) []string {
return read
case PolicyWrite:
return write
case PolicyAutoscaler:
return []string{
NamespaceCapabilityListScalingPolicies,
NamespaceCapabilityReadScalingPolicy,
NamespaceCapabilityReadJobScaling,
NamespaceCapabilityScaleJob,
}
default:
return nil
}
Expand Down
23 changes: 23 additions & 0 deletions acl/policy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ func TestParse(t *testing.T) {
NamespaceCapabilityReadJob,
NamespaceCapabilityCSIListVolume,
NamespaceCapabilityCSIReadVolume,
NamespaceCapabilityReadJobScaling,
NamespaceCapabilityListScalingPolicies,
NamespaceCapabilityReadScalingPolicy,
},
},
},
Expand All @@ -48,6 +51,9 @@ func TestParse(t *testing.T) {
namespace "secret" {
capabilities = ["deny", "read-logs"]
}
namespace "autoscaler" {
policy = "autoscaler"
}
agent {
policy = "read"
}
Expand Down Expand Up @@ -75,6 +81,9 @@ func TestParse(t *testing.T) {
NamespaceCapabilityReadJob,
NamespaceCapabilityCSIListVolume,
NamespaceCapabilityCSIReadVolume,
NamespaceCapabilityReadJobScaling,
NamespaceCapabilityListScalingPolicies,
NamespaceCapabilityReadScalingPolicy,
},
},
{
Expand All @@ -85,6 +94,10 @@ func TestParse(t *testing.T) {
NamespaceCapabilityReadJob,
NamespaceCapabilityCSIListVolume,
NamespaceCapabilityCSIReadVolume,
NamespaceCapabilityReadJobScaling,
NamespaceCapabilityListScalingPolicies,
NamespaceCapabilityReadScalingPolicy,
NamespaceCapabilityScaleJob,
NamespaceCapabilitySubmitJob,
NamespaceCapabilityDispatchJob,
NamespaceCapabilityReadLogs,
Expand All @@ -102,6 +115,16 @@ func TestParse(t *testing.T) {
NamespaceCapabilityReadLogs,
},
},
{
Name: "autoscaler",
Policy: PolicyAutoscaler,
Capabilities: []string{
NamespaceCapabilityListScalingPolicies,
NamespaceCapabilityReadScalingPolicy,
NamespaceCapabilityReadJobScaling,
NamespaceCapabilityScaleJob,
},
},
},
Agent: &AgentPolicy{
Policy: PolicyRead,
Expand Down
37 changes: 37 additions & 0 deletions api/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,43 @@ func (j *Jobs) Info(jobID string, q *QueryOptions) (*Job, *QueryMeta, error) {
return &resp, qm, nil
}

// Scale is used to retrieve information about a particular
// job given its unique ID.
func (j *Jobs) Scale(jobID, group string, count *int,
reason, error *string, meta map[string]interface{}, q *WriteOptions) (*JobRegisterResponse, *WriteMeta, error) {
var count64 *int64
if count != nil {
count64 = int64ToPtr(int64(*count))
}
req := &ScalingRequest{
Count: count64,
Target: map[string]string{
"Job": jobID,
"Group": group,
},
Error: error,
Reason: reason,
Meta: meta,
}
var resp JobRegisterResponse
qm, err := j.client.write(fmt.Sprintf("/v1/job/%s/scale", url.PathEscape(jobID)), req, &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, qm, nil
}

// ScaleStatus is used to retrieve information about a particular
// job given its unique ID.
func (j *Jobs) ScaleStatus(jobID string, q *QueryOptions) (*JobScaleStatusResponse, *QueryMeta, error) {
var resp JobScaleStatusResponse
qm, err := j.client.query(fmt.Sprintf("/v1/job/%s/scale", url.PathEscape(jobID)), &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, qm, nil
}

// Versions is used to retrieve all versions of a particular job given its
// unique ID.
func (j *Jobs) Versions(jobID string, diffs bool, q *QueryOptions) ([]*Job, []*JobDiff, *QueryMeta, error) {
Expand Down
119 changes: 119 additions & 0 deletions api/jobs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -909,6 +909,46 @@ func TestJobs_Info(t *testing.T) {
}
}

func TestJobs_ScaleInvalidAction(t *testing.T) {
t.Parallel()
require := require.New(t)

c, s := makeClient(t, nil, nil)
defer s.Stop()
jobs := c.Jobs()

// Check if invalid inputs fail
tests := []struct {
jobID string
group string
value int
want string
}{
{"", "", 1, "404"},
{"i-dont-exist", "", 1, "400"},
{"", "i-dont-exist", 1, "404"},
{"i-dont-exist", "me-neither", 1, "404"},
}
for _, test := range tests {
_, _, err := jobs.Scale(test.jobID, test.group, &test.value, stringToPtr("reason"), nil, nil, nil)
require.Errorf(err, "expected jobs.Scale(%s, %s) to fail", test.jobID, test.group)
require.Containsf(err.Error(), test.want, "jobs.Scale(%s, %s) error doesn't contain %s, got: %s", test.jobID, test.group, test.want, err)
}

// Register test job
job := testJob()
job.ID = stringToPtr("TestJobs_Scale")
_, wm, err := jobs.Register(job, nil)
require.NoError(err)
assertWriteMeta(t, wm)

// Perform a scaling action with bad group name, verify error
_, _, err = jobs.Scale(*job.ID, "incorrect-group-name", intToPtr(2),
stringToPtr("because"), nil, nil, nil)
require.Error(err)
require.Contains(err.Error(), "does not exist")
}

func TestJobs_Versions(t *testing.T) {
t.Parallel()
c, s := makeClient(t, nil, nil)
Expand Down Expand Up @@ -1533,3 +1573,82 @@ func TestJobs_AddSpread(t *testing.T) {
t.Fatalf("expect: %#v, got: %#v", expect, job.Spreads)
}
}

// TestJobs_ScaleAction tests the scale target for task group count
func TestJobs_ScaleAction(t *testing.T) {
t.Parallel()
require := require.New(t)

c, s := makeClient(t, nil, nil)
defer s.Stop()
jobs := c.Jobs()

id := "job-id/with\\troublesome:characters\n?&字\000"
job := testJobWithScalingPolicy()
job.ID = &id
groupName := *job.TaskGroups[0].Name
groupCount := *job.TaskGroups[0].Count

// Trying to scale against a target before it exists returns an error
_, _, err := jobs.Scale(id, "missing", intToPtr(groupCount+1), stringToPtr("this won't work"), nil, nil, nil)
require.Error(err)
require.Contains(err.Error(), "not found")

// Register the job
_, wm, err := jobs.Register(job, nil)
require.NoError(err)
assertWriteMeta(t, wm)

// Perform scaling action
newCount := groupCount + 1
resp1, wm, err := jobs.Scale(id, groupName,
intToPtr(newCount), stringToPtr("need more instances"), nil, nil, nil)

require.NoError(err)
require.NotNil(resp1)
require.NotEmpty(resp1.EvalID)
assertWriteMeta(t, wm)

// Query the job again
resp, _, err := jobs.Info(*job.ID, nil)
require.NoError(err)
require.Equal(*resp.TaskGroups[0].Count, newCount)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There should be a test for count as nil since it's a unique use case and to make sure this behaviour doesn't changed by accident.

// TODO: check if reason is stored
}

// TestJobs_ScaleStatus tests the /scale status endpoint for task group count
func TestJobs_ScaleStatus(t *testing.T) {
t.Parallel()

require := require.New(t)

c, s := makeClient(t, nil, nil)
defer s.Stop()
jobs := c.Jobs()

// Trying to retrieve a status before it exists returns an error
id := "job-id/with\\troublesome:characters\n?&字\000"
_, _, err := jobs.ScaleStatus(id, nil)
require.Error(err)
require.Contains(err.Error(), "not found")

// Register the job
job := testJobWithScalingPolicy()
job.ID = &id
groupName := *job.TaskGroups[0].Name
groupCount := *job.TaskGroups[0].Count
_, wm, err := jobs.Register(job, nil)
if err != nil {
t.Fatalf("err: %s", err)
}
assertWriteMeta(t, wm)

// Query the scaling endpoint and verify success
result, qm, err := jobs.ScaleStatus(id, nil)
require.NoError(err)
assertQueryMeta(t, qm)

// Check that the result is what we expect
require.Equal(groupCount, result.TaskGroups[groupName].Desired)
}
Loading