Skip to content

Commit

Permalink
Merge pull request #4865 from hashicorp/b-preemption-config
Browse files Browse the repository at this point in the history
improvements to scheduler config API
  • Loading branch information
Preetha authored Nov 12, 2018
2 parents 7eb67b0 + 3eeb229 commit 3c5762b
Show file tree
Hide file tree
Showing 9 changed files with 283 additions and 101 deletions.
38 changes: 23 additions & 15 deletions api/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,11 +122,19 @@ type SchedulerConfiguration struct {
// SchedulerConfigurationResponse is the response object that wraps SchedulerConfiguration
type SchedulerConfigurationResponse struct {
// SchedulerConfig contains scheduler config options
SchedulerConfig SchedulerConfiguration
SchedulerConfig *SchedulerConfiguration

// CreateIndex/ModifyIndex store the create/modify indexes of this configuration.
CreateIndex uint64
ModifyIndex uint64
QueryMeta
}

// SchedulerSetConfigurationResponse is the response object used
// when updating scheduler configuration
type SchedulerSetConfigurationResponse struct {
// Updated returns whether the config was actually updated
// Only set when the request uses CAS
Updated bool

WriteMeta
}

// PreemptionConfig specifies whether preemption is enabled based on scheduler type
Expand All @@ -137,32 +145,32 @@ type PreemptionConfig struct {
// SchedulerGetConfiguration is used to query the current Scheduler configuration.
func (op *Operator) SchedulerGetConfiguration(q *QueryOptions) (*SchedulerConfigurationResponse, *QueryMeta, error) {
var resp SchedulerConfigurationResponse
qm, err := op.c.query("/v1/operator/scheduler/config", &resp, q)
qm, err := op.c.query("/v1/operator/scheduler/configuration", &resp, q)
if err != nil {
return nil, nil, err
}
return &resp, qm, nil
}

// SchedulerSetConfiguration is used to set the current Scheduler configuration.
func (op *Operator) SchedulerSetConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (*WriteMeta, error) {
var out bool
wm, err := op.c.write("/v1/operator/scheduler/config", conf, &out, q)
func (op *Operator) SchedulerSetConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (*SchedulerSetConfigurationResponse, *WriteMeta, error) {
var out SchedulerSetConfigurationResponse
wm, err := op.c.write("/v1/operator/scheduler/configuration", conf, &out, q)
if err != nil {
return nil, err
return nil, nil, err
}
return wm, nil
return &out, wm, nil
}

// SchedulerCASConfiguration is used to perform a Check-And-Set update on the
// Scheduler configuration. The ModifyIndex value will be respected. Returns
// true on success or false on failures.
func (op *Operator) SchedulerCASConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (bool, *WriteMeta, error) {
var out bool
wm, err := op.c.write("/v1/operator/scheduler/config?cas="+strconv.FormatUint(conf.ModifyIndex, 10), conf, &out, q)
func (op *Operator) SchedulerCASConfiguration(conf *SchedulerConfiguration, q *WriteOptions) (*SchedulerSetConfigurationResponse, *WriteMeta, error) {
var out SchedulerSetConfigurationResponse
wm, err := op.c.write("/v1/operator/scheduler/configuration?cas="+strconv.FormatUint(conf.ModifyIndex, 10), conf, &out, q)
if err != nil {
return false, nil, err
return nil, nil, err
}

return out, wm, nil
return &out, wm, nil
}
18 changes: 11 additions & 7 deletions api/operator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,10 @@ func TestAPI_OperatorSchedulerGetSetConfiguration(t *testing.T) {

// Change a config setting
newConf := &SchedulerConfiguration{PreemptionConfig: PreemptionConfig{SystemSchedulerEnabled: false}}
_, err := operator.SchedulerSetConfiguration(newConf, nil)
resp, wm, err := operator.SchedulerSetConfiguration(newConf, nil)
require.Nil(err)
require.NotZero(wm.LastIndex)
require.False(resp.Updated)

config, _, err = operator.SchedulerGetConfiguration(nil)
require.Nil(err)
Expand All @@ -99,21 +101,23 @@ func TestAPI_OperatorSchedulerCASConfiguration(t *testing.T) {
{
newConf := &SchedulerConfiguration{
PreemptionConfig: PreemptionConfig{SystemSchedulerEnabled: false},
ModifyIndex: config.ModifyIndex - 1,
ModifyIndex: config.SchedulerConfig.ModifyIndex - 1,
}
resp, _, err := operator.SchedulerCASConfiguration(newConf, nil)
resp, wm, err := operator.SchedulerCASConfiguration(newConf, nil)
require.Nil(err)
require.False(resp)
require.NotZero(wm.LastIndex)
require.False(resp.Updated)
}

// Pass a valid ModifyIndex
{
newConf := &SchedulerConfiguration{
PreemptionConfig: PreemptionConfig{SystemSchedulerEnabled: false},
ModifyIndex: config.ModifyIndex,
ModifyIndex: config.SchedulerConfig.ModifyIndex,
}
resp, _, err := operator.SchedulerCASConfiguration(newConf, nil)
resp, wm, err := operator.SchedulerCASConfiguration(newConf, nil)
require.Nil(err)
require.True(resp)
require.NotZero(wm.LastIndex)
require.True(resp.Updated)
}
}
2 changes: 1 addition & 1 deletion command/agent/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ func (s *HTTPServer) registerHandlers(enableDebug bool) {
s.mux.HandleFunc("/v1/system/gc", s.wrap(s.GarbageCollectRequest))
s.mux.HandleFunc("/v1/system/reconcile/summaries", s.wrap(s.ReconcileJobSummaries))

s.mux.HandleFunc("/v1/operator/scheduler/config", s.wrap(s.OperatorSchedulerConfiguration))
s.mux.HandleFunc("/v1/operator/scheduler/configuration", s.wrap(s.OperatorSchedulerConfiguration))

if uiEnabled {
s.mux.Handle("/ui/", http.StripPrefix("/ui/", handleUI(http.FileServer(&UIAssetWrapper{FileSystem: assetFS()}))))
Expand Down
93 changes: 43 additions & 50 deletions command/agent/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,66 +215,59 @@ func (s *HTTPServer) OperatorSchedulerConfiguration(resp http.ResponseWriter, re
// Switch on the method
switch req.Method {
case "GET":
var args structs.GenericRequest
if done := s.parse(resp, req, &args.Region, &args.QueryOptions); done {
return nil, nil
}
return s.schedulerGetConfig(resp, req)

var reply structs.SchedulerConfigurationResponse
if err := s.agent.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil {
return nil, err
}
case "PUT", "POST":
return s.schedulerUpdateConfig(resp, req)

out := api.SchedulerConfiguration{
PreemptionConfig: api.PreemptionConfig{SystemSchedulerEnabled: reply.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled},
CreateIndex: reply.CreateIndex,
ModifyIndex: reply.ModifyIndex,
}

resp := api.SchedulerConfigurationResponse{
SchedulerConfig: out,
CreateIndex: out.CreateIndex,
ModifyIndex: out.ModifyIndex,
}
default:
return nil, CodedError(405, ErrInvalidMethod)
}
}

return resp, nil
func (s *HTTPServer) schedulerGetConfig(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var args structs.GenericRequest
if done := s.parse(resp, req, &args.Region, &args.QueryOptions); done {
return nil, nil
}

case "PUT":
var args structs.SchedulerSetConfigRequest
s.parseWriteRequest(req, &args.WriteRequest)
var reply structs.SchedulerConfigurationResponse
if err := s.agent.RPC("Operator.SchedulerGetConfiguration", &args, &reply); err != nil {
return nil, err
}
setMeta(resp, &reply.QueryMeta)

var conf api.SchedulerConfiguration
if err := decodeBody(req, &conf); err != nil {
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing scheduler config: %v", err))
}
return reply, nil
}

args.Config = structs.SchedulerConfiguration{
PreemptionConfig: structs.PreemptionConfig{SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled},
}
func (s *HTTPServer) schedulerUpdateConfig(resp http.ResponseWriter, req *http.Request) (interface{}, error) {
var args structs.SchedulerSetConfigRequest
s.parseWriteRequest(req, &args.WriteRequest)

// Check for cas value
params := req.URL.Query()
if _, ok := params["cas"]; ok {
casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64)
if err != nil {
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing cas value: %v", err))
}
args.Config.ModifyIndex = casVal
args.CAS = true
}
var conf api.SchedulerConfiguration
if err := decodeBody(req, &conf); err != nil {
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing scheduler config: %v", err))
}

var reply bool
if err := s.agent.RPC("Operator.SchedulerSetConfiguration", &args, &reply); err != nil {
return nil, err
}
args.Config = structs.SchedulerConfiguration{
PreemptionConfig: structs.PreemptionConfig{SystemSchedulerEnabled: conf.PreemptionConfig.SystemSchedulerEnabled},
}

// Only use the out value if this was a CAS
if !args.CAS {
return true, nil
// Check for cas value
params := req.URL.Query()
if _, ok := params["cas"]; ok {
casVal, err := strconv.ParseUint(params.Get("cas"), 10, 64)
if err != nil {
return nil, CodedError(http.StatusBadRequest, fmt.Sprintf("Error parsing cas value: %v", err))
}
return reply, nil
args.Config.ModifyIndex = casVal
args.CAS = true
}

default:
return nil, CodedError(404, ErrInvalidMethod)
var reply structs.SchedulerSetConfigurationResponse
if err := s.agent.RPC("Operator.SchedulerSetConfiguration", &args, &reply); err != nil {
return nil, err
}
setIndex(resp, reply.Index)
return reply, nil
}
38 changes: 26 additions & 12 deletions command/agent/operator_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,12 +264,12 @@ func TestOperator_SchedulerGetConfiguration(t *testing.T) {
httpTest(t, nil, func(s *TestAgent) {
require := require.New(t)
body := bytes.NewBuffer(nil)
req, _ := http.NewRequest("GET", "/v1/operator/scheduler/config", body)
req, _ := http.NewRequest("GET", "/v1/operator/scheduler/configuration", body)
resp := httptest.NewRecorder()
obj, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.Equal(200, resp.Code)
out, ok := obj.(api.SchedulerConfigurationResponse)
out, ok := obj.(structs.SchedulerConfigurationResponse)
require.True(ok)
require.True(out.SchedulerConfig.PreemptionConfig.SystemSchedulerEnabled)
})
Expand All @@ -282,11 +282,14 @@ func TestOperator_SchedulerSetConfiguration(t *testing.T) {
body := bytes.NewBuffer([]byte(`{"PreemptionConfig": {
"SystemSchedulerEnabled": true
}}`))
req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/config", body)
req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/configuration", body)
resp := httptest.NewRecorder()
_, err := s.Server.OperatorSchedulerConfiguration(resp, req)
setResp, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.Equal(200, resp.Code)
schedSetResp, ok := setResp.(structs.SchedulerSetConfigurationResponse)
require.True(ok)
require.NotZero(schedSetResp.Index)

args := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Expand All @@ -308,11 +311,14 @@ func TestOperator_SchedulerCASConfiguration(t *testing.T) {
body := bytes.NewBuffer([]byte(`{"PreemptionConfig": {
"SystemSchedulerEnabled": true
}}`))
req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/config", body)
req, _ := http.NewRequest("PUT", "/v1/operator/scheduler/configuration", body)
resp := httptest.NewRecorder()
_, err := s.Server.OperatorSchedulerConfiguration(resp, req)
setResp, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.Equal(200, resp.Code)
schedSetResp, ok := setResp.(structs.SchedulerSetConfigurationResponse)
require.True(ok)
require.NotZero(schedSetResp.Index)

args := structs.GenericRequest{
QueryOptions: structs.QueryOptions{
Expand All @@ -331,23 +337,31 @@ func TestOperator_SchedulerCASConfiguration(t *testing.T) {
buf := bytes.NewBuffer([]byte(`{"PreemptionConfig": {
"SystemSchedulerEnabled": false
}}`))
req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/operator/scheduler/config?cas=%d", reply.ModifyIndex-1), buf)
req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/operator/scheduler/configuration?cas=%d", reply.QueryMeta.Index-1), buf)
resp := httptest.NewRecorder()
obj, err := s.Server.OperatorSchedulerConfiguration(resp, req)
setResp, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.False(obj.(bool))
// Verify that the response has Updated=false
schedSetResp, ok := setResp.(structs.SchedulerSetConfigurationResponse)
require.True(ok)
require.NotZero(schedSetResp.Index)
require.False(schedSetResp.Updated)
}

// Create a CAS request, good index
{
buf := bytes.NewBuffer([]byte(`{"PreemptionConfig": {
"SystemSchedulerEnabled": false
}}`))
req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/operator/scheduler/config?cas=%d", reply.ModifyIndex), buf)
req, _ := http.NewRequest("PUT", fmt.Sprintf("/v1/operator/scheduler/configuration?cas=%d", reply.QueryMeta.Index), buf)
resp := httptest.NewRecorder()
obj, err := s.Server.OperatorSchedulerConfiguration(resp, req)
setResp, err := s.Server.OperatorSchedulerConfiguration(resp, req)
require.Nil(err)
require.True(obj.(bool))
// Verify that the response has Updated=true
schedSetResp, ok := setResp.(structs.SchedulerSetConfigurationResponse)
require.True(ok)
require.NotZero(schedSetResp.Index)
require.True(schedSetResp.Updated)
}

// Verify the update
Expand Down
22 changes: 11 additions & 11 deletions nomad/operator_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func (op *Operator) ServerHealth(args *structs.GenericRequest, reply *autopilot.
}

// SchedulerSetConfiguration is used to set the current Scheduler configuration.
func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRequest, reply *bool) error {
func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRequest, reply *structs.SchedulerSetConfigurationResponse) error {
if done, err := op.srv.forward("Operator.SchedulerSetConfiguration", args, args, reply); done {
return err
}
Expand All @@ -300,18 +300,20 @@ func (op *Operator) SchedulerSetConfiguration(args *structs.SchedulerSetConfigRe
}

// Apply the update
resp, _, err := op.srv.raftApply(structs.SchedulerConfigRequestType, args)
resp, index, err := op.srv.raftApply(structs.SchedulerConfigRequestType, args)
if err != nil {
op.logger.Error("failed applying Scheduler configuration", "error", err)
return err
} else if respErr, ok := resp.(error); ok {
return respErr
}

// Check if the return type is a bool.
// Check if the return type is a bool
// Only applies to CAS requests
if respBool, ok := resp.(bool); ok {
*reply = respBool
reply.Updated = respBool
}
reply.Index = index
return nil
}

Expand All @@ -330,19 +332,17 @@ func (op *Operator) SchedulerGetConfiguration(args *structs.GenericRequest, repl
}

state := op.srv.fsm.State()
_, config, err := state.SchedulerConfig()
index, config, err := state.SchedulerConfig()

if err != nil {
return err
} else if config == nil {
return fmt.Errorf("scheduler config not initialized yet")
}

resp := &structs.SchedulerConfigurationResponse{
SchedulerConfig: *config,
CreateIndex: config.CreateIndex,
ModifyIndex: config.ModifyIndex,
}
*reply = *resp
reply.SchedulerConfig = config
reply.QueryMeta.Index = index
op.srv.setQueryMeta(&reply.QueryMeta)

return nil
}
Loading

0 comments on commit 3c5762b

Please sign in to comment.