From 93356e7d7007359544688c35d2332b8f14c55b11 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Wed, 2 May 2018 16:49:47 -0700 Subject: [PATCH 1/6] consul: initial grpc implementation Needs to be more like http. --- api/tasks.go | 2 ++ client/task_runner.go | 1 + command/agent/consul/client.go | 10 ++++++++++ command/agent/job_endpoint.go | 2 ++ jobspec/parse.go | 2 ++ jobspec/parse_test.go | 12 +++++++----- jobspec/test-fixtures/basic.hcl | 12 +++++++----- nomad/structs/structs.go | 17 +++++++++++++++++ 8 files changed, 48 insertions(+), 10 deletions(-) diff --git a/api/tasks.go b/api/tasks.go index 4895312efcf..81ec6c00a5c 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -286,6 +286,8 @@ type ServiceCheck struct { Header map[string][]string Method string CheckRestart *CheckRestart `mapstructure:"check_restart"` + GRPC string + GRPCUseTLS bool `mapstructure:"grpc_use_tls"` } // The Service model represents a Consul service definition diff --git a/client/task_runner.go b/client/task_runner.go index 2a0303576ae..83166fed1db 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -1516,6 +1516,7 @@ func interpolateServices(taskEnv *env.TaskEnv, task *structs.Task) *structs.Task check.PortLabel = taskEnv.ReplaceEnv(check.PortLabel) check.InitialStatus = taskEnv.ReplaceEnv(check.InitialStatus) check.Method = taskEnv.ReplaceEnv(check.Method) + check.GRPC = taskEnv.ReplaceEnv(check.GRPC) if len(check.Header) > 0 { header := make(map[string][]string, len(check.Header)) for k, vs := range check.Header { diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index cc0a26d411e..140d2d8d98d 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -1094,12 +1094,22 @@ func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host chkReg.HTTP = url.String() chkReg.Method = check.Method chkReg.Header = check.Header + case structs.ServiceCheckTCP: chkReg.TCP = net.JoinHostPort(host, strconv.Itoa(port)) + case structs.ServiceCheckScript: chkReg.TTL = (check.Interval + ttlCheckBuffer).String() // As of Consul 1.0.0 setting TTL and Interval is a 400 chkReg.Interval = "" + + case structs.ServiceCheckGRPC: + chkReg.GRPC = check.GRPC + chkReg.GRPCUseTLS = check.GRPCUseTLS + if check.TLSSkipVerify { + chkReg.TLSSkipVerify = true + } + default: return nil, fmt.Errorf("check type %+q not valid", check.Type) } diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index 2a0a92b1763..aa74a4233cd 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -764,6 +764,8 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) { TLSSkipVerify: check.TLSSkipVerify, Header: check.Header, Method: check.Method, + GRPC: check.GRPC, + GRPCUseTLS: check.GRPCUseTLS, } if check.CheckRestart != nil { structsTask.Services[i].Checks[j].CheckRestart = &structs.CheckRestart{ diff --git a/jobspec/parse.go b/jobspec/parse.go index e56161cd4c4..dc5f61c1518 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -1061,6 +1061,8 @@ func parseChecks(service *api.Service, checkObjs *ast.ObjectList) error { "method", "check_restart", "address_mode", + "grpc", + "grpc_use_tls", } if err := helper.CheckHCLKeys(co.Val, valid); err != nil { return multierror.Prefix(err, "check ->") diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 6a705bef6c0..067397947a1 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -135,11 +135,13 @@ func TestParse(t *testing.T) { PortLabel: "http", Checks: []api.ServiceCheck{ { - Name: "check-name", - Type: "tcp", - PortLabel: "admin", - Interval: 10 * time.Second, - Timeout: 2 * time.Second, + Name: "check-name", + Type: "tcp", + PortLabel: "admin", + Interval: 10 * time.Second, + Timeout: 2 * time.Second, + GRPC: "localhost:12345/foo", + GRPCUseTLS: true, CheckRestart: &api.CheckRestart{ Limit: 3, Grace: helper.TimeToPtr(10 * time.Second), diff --git a/jobspec/test-fixtures/basic.hcl b/jobspec/test-fixtures/basic.hcl index 2b3f973aa9c..624390bf160 100644 --- a/jobspec/test-fixtures/basic.hcl +++ b/jobspec/test-fixtures/basic.hcl @@ -102,11 +102,13 @@ job "binstore-storagelocker" { port = "http" check { - name = "check-name" - type = "tcp" - interval = "10s" - timeout = "2s" - port = "admin" + name = "check-name" + type = "tcp" + interval = "10s" + timeout = "2s" + port = "admin" + grpc = "localhost:12345/foo" + grpc_use_tls = true check_restart { limit = 3 diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 601942951ae..7f0036031cf 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3515,6 +3515,7 @@ const ( ServiceCheckHTTP = "http" ServiceCheckTCP = "tcp" ServiceCheckScript = "script" + ServiceCheckGRPC = "grpc" // minCheckInterval is the minimum check interval permitted. Consul // currently has its MinInterval set to 1s. Mirror that here for @@ -3544,6 +3545,8 @@ type ServiceCheck struct { Method string // HTTP Method to use (GET by default) Header map[string][]string // HTTP Headers for Consul to set when making HTTP checks CheckRestart *CheckRestart // If and when a task should be restarted based on checks + GRPC string // Endpoint for GRPC checks + GRPCUseTLS bool // Whether or not to use TLS for GRPC checks } func (sc *ServiceCheck) Copy() *ServiceCheck { @@ -3601,6 +3604,12 @@ func (sc *ServiceCheck) validate() error { if sc.Command == "" { return fmt.Errorf("script type must have a valid script path") } + + case ServiceCheckGRPC: + if sc.GRPC == "" { + return fmt.Errorf("grpc type must have a valid endpoint") + } + default: return fmt.Errorf(`invalid type (%+q), must be one of "http", "tcp", or "script" type`, sc.Type) } @@ -3696,6 +3705,14 @@ func (sc *ServiceCheck) Hash(serviceID string) string { io.WriteString(h, sc.AddressMode) } + // Only include GRPC if set to maintain ID stability with Nomad <0.8.4 + if sc.GRPC != "" { + io.WriteString(h, sc.GRPC) + } + if sc.GRPCUseTLS { + io.WriteString(h, "true") + } + return fmt.Sprintf("%x", h.Sum(nil)) } From 882bf5a62cb3cd215daabc402c9c8ce9e0048ee0 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 3 May 2018 11:27:28 -0700 Subject: [PATCH 2/6] vendor: update consul for grpc --- vendor/github.com/hashicorp/consul/api/acl.go | 2 +- .../github.com/hashicorp/consul/api/agent.go | 43 ++++--- vendor/github.com/hashicorp/consul/api/api.go | 32 +++-- .../hashicorp/consul/api/catalog.go | 2 + .../hashicorp/consul/api/coordinate.go | 38 ++++++ .../github.com/hashicorp/consul/api/health.go | 15 +++ vendor/github.com/hashicorp/consul/api/kv.go | 4 +- .../github.com/hashicorp/consul/api/lock.go | 2 +- .../consul/api/operator_autopilot.go | 2 +- .../hashicorp/consul/api/prepared_query.go | 8 +- .../hashicorp/consul/api/semaphore.go | 2 +- .../hashicorp/consul/command/flags/http.go | 8 +- .../hashicorp/consul/lib/freeport/freeport.go | 2 +- vendor/github.com/hashicorp/consul/lib/rtt.go | 4 +- .../github.com/hashicorp/consul/lib/serf.go | 20 ++++ .../hashicorp/consul/testutil/retry/retry.go | 7 +- .../hashicorp/consul/testutil/server.go | 35 +----- .../hashicorp/serf/coordinate/client.go | 16 +++ .../github.com/hashicorp/serf/serf/config.go | 11 ++ .../hashicorp/serf/serf/ping_delegate.go | 3 +- vendor/github.com/hashicorp/serf/serf/serf.go | 40 +++++-- .../hashicorp/serf/serf/snapshot.go | 111 +++++------------- vendor/vendor.json | 18 +-- 23 files changed, 252 insertions(+), 173 deletions(-) create mode 100644 vendor/github.com/hashicorp/consul/lib/serf.go diff --git a/vendor/github.com/hashicorp/consul/api/acl.go b/vendor/github.com/hashicorp/consul/api/acl.go index 6ea0a752e58..8ec9aa58557 100644 --- a/vendor/github.com/hashicorp/consul/api/acl.go +++ b/vendor/github.com/hashicorp/consul/api/acl.go @@ -5,7 +5,7 @@ import ( ) const ( - // ACLCLientType is the client type token + // ACLClientType is the client type token ACLClientType = "client" // ACLManagementType is the management type token diff --git a/vendor/github.com/hashicorp/consul/api/agent.go b/vendor/github.com/hashicorp/consul/api/agent.go index ac57415c151..b42baed41d3 100644 --- a/vendor/github.com/hashicorp/consul/api/agent.go +++ b/vendor/github.com/hashicorp/consul/api/agent.go @@ -15,6 +15,7 @@ type AgentCheck struct { Output string ServiceID string ServiceName string + Definition HealthCheckDefinition } // AgentService represents a service known to the agent @@ -59,12 +60,13 @@ type MembersOpts struct { // AgentServiceRegistration is used to register a new service type AgentServiceRegistration struct { - ID string `json:",omitempty"` - Name string `json:",omitempty"` - Tags []string `json:",omitempty"` - Port int `json:",omitempty"` - Address string `json:",omitempty"` - EnableTagOverride bool `json:",omitempty"` + ID string `json:",omitempty"` + Name string `json:",omitempty"` + Tags []string `json:",omitempty"` + Port int `json:",omitempty"` + Address string `json:",omitempty"` + EnableTagOverride bool `json:",omitempty"` + Meta map[string]string `json:",omitempty"` Check *AgentServiceCheck Checks AgentServiceChecks } @@ -80,7 +82,10 @@ type AgentCheckRegistration struct { // AgentServiceCheck is used to define a node or service level check type AgentServiceCheck struct { - Script string `json:",omitempty"` + CheckID string `json:",omitempty"` + Name string `json:",omitempty"` + Args []string `json:"ScriptArgs,omitempty"` + Script string `json:",omitempty"` // Deprecated, use Args. DockerContainerID string `json:",omitempty"` Shell string `json:",omitempty"` // Only supported for Docker. Interval string `json:",omitempty"` @@ -93,6 +98,8 @@ type AgentServiceCheck struct { Status string `json:",omitempty"` Notes string `json:",omitempty"` TLSSkipVerify bool `json:",omitempty"` + GRPC string `json:",omitempty"` + GRPCUseTLS bool `json:",omitempty"` // In Consul 0.7 and later, checks that are associated with a service // may also contain this optional DeregisterCriticalServiceAfter field, @@ -580,36 +587,36 @@ func (a *Agent) Monitor(loglevel string, stopCh <-chan struct{}, q *QueryOptions // UpdateACLToken updates the agent's "acl_token". See updateToken for more // details. -func (c *Agent) UpdateACLToken(token string, q *WriteOptions) (*WriteMeta, error) { - return c.updateToken("acl_token", token, q) +func (a *Agent) UpdateACLToken(token string, q *WriteOptions) (*WriteMeta, error) { + return a.updateToken("acl_token", token, q) } // UpdateACLAgentToken updates the agent's "acl_agent_token". See updateToken // for more details. -func (c *Agent) UpdateACLAgentToken(token string, q *WriteOptions) (*WriteMeta, error) { - return c.updateToken("acl_agent_token", token, q) +func (a *Agent) UpdateACLAgentToken(token string, q *WriteOptions) (*WriteMeta, error) { + return a.updateToken("acl_agent_token", token, q) } // UpdateACLAgentMasterToken updates the agent's "acl_agent_master_token". See // updateToken for more details. -func (c *Agent) UpdateACLAgentMasterToken(token string, q *WriteOptions) (*WriteMeta, error) { - return c.updateToken("acl_agent_master_token", token, q) +func (a *Agent) UpdateACLAgentMasterToken(token string, q *WriteOptions) (*WriteMeta, error) { + return a.updateToken("acl_agent_master_token", token, q) } // UpdateACLReplicationToken updates the agent's "acl_replication_token". See // updateToken for more details. -func (c *Agent) UpdateACLReplicationToken(token string, q *WriteOptions) (*WriteMeta, error) { - return c.updateToken("acl_replication_token", token, q) +func (a *Agent) UpdateACLReplicationToken(token string, q *WriteOptions) (*WriteMeta, error) { + return a.updateToken("acl_replication_token", token, q) } // updateToken can be used to update an agent's ACL token after the agent has // started. The tokens are not persisted, so will need to be updated again if // the agent is restarted. -func (c *Agent) updateToken(target, token string, q *WriteOptions) (*WriteMeta, error) { - r := c.c.newRequest("PUT", fmt.Sprintf("/v1/agent/token/%s", target)) +func (a *Agent) updateToken(target, token string, q *WriteOptions) (*WriteMeta, error) { + r := a.c.newRequest("PUT", fmt.Sprintf("/v1/agent/token/%s", target)) r.setWriteOptions(q) r.obj = &AgentToken{Token: token} - rtt, resp, err := requireOK(c.c.doRequest(r)) + rtt, resp, err := requireOK(a.c.doRequest(r)) if err != nil { return nil, err } diff --git a/vendor/github.com/hashicorp/consul/api/api.go b/vendor/github.com/hashicorp/consul/api/api.go index 87cd3c5a377..1cdc21e3317 100644 --- a/vendor/github.com/hashicorp/consul/api/api.go +++ b/vendor/github.com/hashicorp/consul/api/api.go @@ -101,7 +101,7 @@ type QueryOptions struct { // be provided for filtering. NodeMeta map[string]string - // RelayFactor is used in keyring operations to cause reponses to be + // RelayFactor is used in keyring operations to cause responses to be // relayed back to the sender through N other random nodes. Must be // a value from 0 to 5 (inclusive). RelayFactor uint8 @@ -137,7 +137,7 @@ type WriteOptions struct { // which overrides the agent's default token. Token string - // RelayFactor is used in keyring operations to cause reponses to be + // RelayFactor is used in keyring operations to cause responses to be // relayed back to the sender through N other random nodes. Must be // a value from 0 to 5 (inclusive). RelayFactor uint8 @@ -377,12 +377,14 @@ func SetupTLSConfig(tlsConfig *TLSConfig) (*tls.Config, error) { tlsClientConfig.Certificates = []tls.Certificate{tlsCert} } - rootConfig := &rootcerts.Config{ - CAFile: tlsConfig.CAFile, - CAPath: tlsConfig.CAPath, - } - if err := rootcerts.ConfigureTLS(tlsClientConfig, rootConfig); err != nil { - return nil, err + if tlsConfig.CAFile != "" || tlsConfig.CAPath != "" { + rootConfig := &rootcerts.Config{ + CAFile: tlsConfig.CAFile, + CAPath: tlsConfig.CAPath, + } + if err := rootcerts.ConfigureTLS(tlsClientConfig, rootConfig); err != nil { + return nil, err + } } return tlsClientConfig, nil @@ -477,6 +479,14 @@ func NewHttpClient(transport *http.Transport, tlsConf TLSConfig) (*http.Client, Transport: transport, } + // TODO (slackpad) - Once we get some run time on the HTTP/2 support we + // should turn it on by default if TLS is enabled. We would basically + // just need to call http2.ConfigureTransport(transport) here. We also + // don't want to introduce another external dependency on + // golang.org/x/net/http2 at this time. For a complete recipe for how + // to enable HTTP/2 support on a transport suitable for the API client + // library see agent/http_test.go:TestHTTPServer_H2. + if transport.TLSClientConfig == nil { tlsClientConfig, err := SetupTLSConfig(&tlsConf) @@ -623,9 +633,9 @@ func (r *request) toHTTP() (*http.Request, error) { } if r.ctx != nil { return req.WithContext(r.ctx), nil - } else { - return req, nil } + + return req, nil } // newRequest is used to create a new request @@ -661,7 +671,7 @@ func (c *Client) doRequest(r *request) (time.Duration, *http.Response, error) { } start := time.Now() resp, err := c.config.HttpClient.Do(req) - diff := time.Now().Sub(start) + diff := time.Since(start) return diff, resp, err } diff --git a/vendor/github.com/hashicorp/consul/api/catalog.go b/vendor/github.com/hashicorp/consul/api/catalog.go index babfc9a1df4..80ce1bc8156 100644 --- a/vendor/github.com/hashicorp/consul/api/catalog.go +++ b/vendor/github.com/hashicorp/consul/api/catalog.go @@ -22,6 +22,7 @@ type CatalogService struct { ServiceName string ServiceAddress string ServiceTags []string + ServiceMeta map[string]string ServicePort int ServiceEnableTagOverride bool CreateIndex uint64 @@ -42,6 +43,7 @@ type CatalogRegistration struct { Datacenter string Service *AgentService Check *AgentCheck + SkipNodeUpdate bool } type CatalogDeregistration struct { diff --git a/vendor/github.com/hashicorp/consul/api/coordinate.go b/vendor/github.com/hashicorp/consul/api/coordinate.go index 90214e392ce..53318f11dd5 100644 --- a/vendor/github.com/hashicorp/consul/api/coordinate.go +++ b/vendor/github.com/hashicorp/consul/api/coordinate.go @@ -66,3 +66,41 @@ func (c *Coordinate) Nodes(q *QueryOptions) ([]*CoordinateEntry, *QueryMeta, err } return out, qm, nil } + +// Update inserts or updates the LAN coordinate of a node. +func (c *Coordinate) Update(coord *CoordinateEntry, q *WriteOptions) (*WriteMeta, error) { + r := c.c.newRequest("PUT", "/v1/coordinate/update") + r.setWriteOptions(q) + r.obj = coord + rtt, resp, err := requireOK(c.c.doRequest(r)) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + wm := &WriteMeta{} + wm.RequestTime = rtt + + return wm, nil +} + +// Node is used to return the coordinates of a single in the LAN pool. +func (c *Coordinate) Node(node string, q *QueryOptions) ([]*CoordinateEntry, *QueryMeta, error) { + r := c.c.newRequest("GET", "/v1/coordinate/node/"+node) + r.setQueryOptions(q) + rtt, resp, err := requireOK(c.c.doRequest(r)) + if err != nil { + return nil, nil, err + } + defer resp.Body.Close() + + qm := &QueryMeta{} + parseQueryMeta(resp, qm) + qm.RequestTime = rtt + + var out []*CoordinateEntry + if err := decodeBody(resp, &out); err != nil { + return nil, nil, err + } + return out, qm, nil +} diff --git a/vendor/github.com/hashicorp/consul/api/health.go b/vendor/github.com/hashicorp/consul/api/health.go index 38c105fdb93..53f3de4f795 100644 --- a/vendor/github.com/hashicorp/consul/api/health.go +++ b/vendor/github.com/hashicorp/consul/api/health.go @@ -34,6 +34,21 @@ type HealthCheck struct { ServiceID string ServiceName string ServiceTags []string + + Definition HealthCheckDefinition +} + +// HealthCheckDefinition is used to store the details about +// a health check's execution. +type HealthCheckDefinition struct { + HTTP string + Header map[string][]string + Method string + TLSSkipVerify bool + TCP string + Interval ReadableDuration + Timeout ReadableDuration + DeregisterCriticalServiceAfter ReadableDuration } // HealthChecks is a collection of HealthCheck structs. diff --git a/vendor/github.com/hashicorp/consul/api/kv.go b/vendor/github.com/hashicorp/consul/api/kv.go index 5990d3d13a5..97f51568559 100644 --- a/vendor/github.com/hashicorp/consul/api/kv.go +++ b/vendor/github.com/hashicorp/consul/api/kv.go @@ -252,7 +252,7 @@ func (k *KV) put(key string, params map[string]string, body []byte, q *WriteOpti if _, err := io.Copy(&buf, resp.Body); err != nil { return false, nil, fmt.Errorf("Failed to read response: %v", err) } - res := strings.Contains(string(buf.Bytes()), "true") + res := strings.Contains(buf.String(), "true") return res, qm, nil } @@ -296,7 +296,7 @@ func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOption if _, err := io.Copy(&buf, resp.Body); err != nil { return false, nil, fmt.Errorf("Failed to read response: %v", err) } - res := strings.Contains(string(buf.Bytes()), "true") + res := strings.Contains(buf.String(), "true") return res, qm, nil } diff --git a/vendor/github.com/hashicorp/consul/api/lock.go b/vendor/github.com/hashicorp/consul/api/lock.go index d3187113f72..41f72e7d23a 100644 --- a/vendor/github.com/hashicorp/consul/api/lock.go +++ b/vendor/github.com/hashicorp/consul/api/lock.go @@ -180,7 +180,7 @@ WAIT: // Handle the one-shot mode. if l.opts.LockTryOnce && attempts > 0 { - elapsed := time.Now().Sub(start) + elapsed := time.Since(start) if elapsed > qOpts.WaitTime { return nil, nil } diff --git a/vendor/github.com/hashicorp/consul/api/operator_autopilot.go b/vendor/github.com/hashicorp/consul/api/operator_autopilot.go index 0fa9d160403..b179406dc12 100644 --- a/vendor/github.com/hashicorp/consul/api/operator_autopilot.go +++ b/vendor/github.com/hashicorp/consul/api/operator_autopilot.go @@ -196,7 +196,7 @@ func (op *Operator) AutopilotCASConfiguration(conf *AutopilotConfiguration, q *W if _, err := io.Copy(&buf, resp.Body); err != nil { return false, fmt.Errorf("Failed to read response: %v", err) } - res := strings.Contains(string(buf.Bytes()), "true") + res := strings.Contains(buf.String(), "true") return res, nil } diff --git a/vendor/github.com/hashicorp/consul/api/prepared_query.go b/vendor/github.com/hashicorp/consul/api/prepared_query.go index ff210de3f00..d322dd86794 100644 --- a/vendor/github.com/hashicorp/consul/api/prepared_query.go +++ b/vendor/github.com/hashicorp/consul/api/prepared_query.go @@ -34,6 +34,12 @@ type ServiceQuery struct { // local datacenter. Failover QueryDatacenterOptions + // IgnoreCheckIDs is an optional list of health check IDs to ignore when + // considering which nodes are healthy. It is useful as an emergency measure + // to temporarily override some health check that is producing false negatives + // for example. + IgnoreCheckIDs []string + // If OnlyPassing is true then we will only include nodes with passing // health checks (critical AND warning checks will cause a node to be // discarded) @@ -61,7 +67,7 @@ type QueryTemplate struct { Regexp string } -// PrepatedQueryDefinition defines a complete prepared query. +// PreparedQueryDefinition defines a complete prepared query. type PreparedQueryDefinition struct { // ID is this UUID-based ID for the query, always generated by Consul. ID string diff --git a/vendor/github.com/hashicorp/consul/api/semaphore.go b/vendor/github.com/hashicorp/consul/api/semaphore.go index d848dfd0b19..d0c57417788 100644 --- a/vendor/github.com/hashicorp/consul/api/semaphore.go +++ b/vendor/github.com/hashicorp/consul/api/semaphore.go @@ -198,7 +198,7 @@ WAIT: // Handle the one-shot mode. if s.opts.SemaphoreTryOnce && attempts > 0 { - elapsed := time.Now().Sub(start) + elapsed := time.Since(start) if elapsed > qOpts.WaitTime { return nil, nil } diff --git a/vendor/github.com/hashicorp/consul/command/flags/http.go b/vendor/github.com/hashicorp/consul/command/flags/http.go index 1fa78e16713..591567a4f0c 100644 --- a/vendor/github.com/hashicorp/consul/command/flags/http.go +++ b/vendor/github.com/hashicorp/consul/command/flags/http.go @@ -87,6 +87,12 @@ func (f *HTTPFlags) Token() string { func (f *HTTPFlags) APIClient() (*api.Client, error) { c := api.DefaultConfig() + f.MergeOntoConfig(c) + + return api.NewClient(c) +} + +func (f *HTTPFlags) MergeOntoConfig(c *api.Config) { f.address.Merge(&c.Address) f.token.Merge(&c.Token) f.caFile.Merge(&c.TLSConfig.CAFile) @@ -95,6 +101,4 @@ func (f *HTTPFlags) APIClient() (*api.Client, error) { f.keyFile.Merge(&c.TLSConfig.KeyFile) f.tlsServerName.Merge(&c.TLSConfig.Address) f.datacenter.Merge(&c.Datacenter) - - return api.NewClient(c) } diff --git a/vendor/github.com/hashicorp/consul/lib/freeport/freeport.go b/vendor/github.com/hashicorp/consul/lib/freeport/freeport.go index 882998314f5..806449ba4a0 100644 --- a/vendor/github.com/hashicorp/consul/lib/freeport/freeport.go +++ b/vendor/github.com/hashicorp/consul/lib/freeport/freeport.go @@ -16,7 +16,7 @@ const ( // blockSize is the size of the allocated port block. ports are given out // consecutively from that block with roll-over for the lifetime of the // application/test run. - blockSize = 500 + blockSize = 1500 // maxBlocks is the number of available port blocks. // lowPort + maxBlocks * blockSize must be less than 65535. diff --git a/vendor/github.com/hashicorp/consul/lib/rtt.go b/vendor/github.com/hashicorp/consul/lib/rtt.go index a417f533c91..fb9090ab1db 100644 --- a/vendor/github.com/hashicorp/consul/lib/rtt.go +++ b/vendor/github.com/hashicorp/consul/lib/rtt.go @@ -36,14 +36,14 @@ func (cs CoordinateSet) Intersect(other CoordinateSet) (*coordinate.Coordinate, // we are possibly a client. Any node with more than one segment can only // be a server, which means it should be in all segments. if len(cs) == 1 { - for s, _ := range cs { + for s := range cs { segment = s } } // Likewise for the other set. if len(other) == 1 { - for s, _ := range other { + for s := range other { segment = s } } diff --git a/vendor/github.com/hashicorp/consul/lib/serf.go b/vendor/github.com/hashicorp/consul/lib/serf.go new file mode 100644 index 00000000000..ee7fad53002 --- /dev/null +++ b/vendor/github.com/hashicorp/consul/lib/serf.go @@ -0,0 +1,20 @@ +package lib + +import ( + "github.com/hashicorp/serf/serf" +) + +// SerfDefaultConfig returns a Consul-flavored Serf default configuration, +// suitable as a basis for a LAN, WAN, segment, or area. +func SerfDefaultConfig() *serf.Config { + base := serf.DefaultConfig() + + // This effectively disables the annoying queue depth warnings. + base.QueueDepthWarning = 1000000 + + // This enables dynamic sizing of the message queue depth based on the + // cluster size. + base.MinQueueDepth = 4096 + + return base +} diff --git a/vendor/github.com/hashicorp/consul/testutil/retry/retry.go b/vendor/github.com/hashicorp/consul/testutil/retry/retry.go index cfbdde3c9db..ba3a7c36b8b 100644 --- a/vendor/github.com/hashicorp/consul/testutil/retry/retry.go +++ b/vendor/github.com/hashicorp/consul/testutil/retry/retry.go @@ -82,7 +82,7 @@ func decorate(s string) string { } func Run(t Failer, f func(r *R)) { - run(TwoSeconds(), t, f) + run(DefaultFailer(), t, f) } func RunWith(r Retryer, t Failer, f func(r *R)) { @@ -133,6 +133,11 @@ func run(r Retryer, t Failer, f func(r *R)) { } } +// DefaultFailer provides default retry.Run() behavior for unit tests. +func DefaultFailer() *Timer { + return &Timer{Timeout: 7 * time.Second, Wait: 25 * time.Millisecond} +} + // TwoSeconds repeats an operation for two seconds and waits 25ms in between. func TwoSeconds() *Timer { return &Timer{Timeout: 2 * time.Second, Wait: 25 * time.Millisecond} diff --git a/vendor/github.com/hashicorp/consul/testutil/server.go b/vendor/github.com/hashicorp/consul/testutil/server.go index 93b734a671b..06c0fdfd283 100644 --- a/vendor/github.com/hashicorp/consul/testutil/server.go +++ b/vendor/github.com/hashicorp/consul/testutil/server.go @@ -27,7 +27,7 @@ import ( "testing" "time" - "github.com/hashicorp/consul/test/porter" + "github.com/hashicorp/consul/lib/freeport" "github.com/hashicorp/consul/testutil/retry" "github.com/hashicorp/go-cleanhttp" "github.com/hashicorp/go-uuid" @@ -111,17 +111,7 @@ func defaultServerConfig() *TestServerConfig { panic(err) } - ports, err := porter.RandomPorts(6) - if err != nil { - if _, ok := err.(*porter.PorterExistErr); ok { - // Fall back in the case that the testutil server is being used - // without porter. This should NEVER be used for Consul's own - // unit tests. See comments for getRandomPorts() for more details. - ports = getRandomPorts(6) - } else { - panic(err) - } - } + ports := freeport.Get(6) return &TestServerConfig{ NodeName: "node-" + nodeID, NodeID: nodeID, @@ -324,7 +314,7 @@ func (s *TestServer) waitForAPI() error { } defer resp.Body.Close() if err := s.requireOK(resp); err != nil { - r.Fatal("failed OK respose", err) + r.Fatal("failed OK response", err) } }) if f.failed { @@ -390,22 +380,3 @@ func (s *TestServer) waitForLeader() error { } return nil } - -// getRandomPorts returns a set of random port or panics on error. This -// is here to support external uses of testutil which may not have porter, -// but this has been shown not to work well with parallel tests (such as -// Consul's unit tests). This fallback should NEVER be used for Consul's -// own tests. -func getRandomPorts(n int) []int { - ports := make([]int, n) - for i := 0; i < n; i++ { - l, err := net.Listen("tcp", ":0") - if err != nil { - panic(err) - } - l.Close() - ports[i] = l.Addr().(*net.TCPAddr).Port - } - - return ports -} diff --git a/vendor/github.com/hashicorp/serf/coordinate/client.go b/vendor/github.com/hashicorp/serf/coordinate/client.go index 63f6241411b..3582ee4dae2 100644 --- a/vendor/github.com/hashicorp/serf/coordinate/client.go +++ b/vendor/github.com/hashicorp/serf/coordinate/client.go @@ -6,6 +6,8 @@ import ( "sort" "sync" "time" + + "github.com/armon/go-metrics" ) // Client manages the estimated network coordinate for a given node, and adjusts @@ -205,6 +207,20 @@ func (c *Client) Update(node string, other *Coordinate, rtt time.Duration) (*Coo return nil, err } + // The code down below can handle zero RTTs, which we have seen in + // https://github.com/hashicorp/consul/issues/3789, presumably in + // environments with coarse-grained monotonic clocks (we are still + // trying to pin this down). In any event, this is ok from a code PoV + // so we don't need to alert operators with spammy messages. We did + // add a counter so this is still observable, though. + const maxRTT = 10 * time.Second + if rtt < 0 || rtt > maxRTT { + return nil, fmt.Errorf("round trip time not in valid range, duration %v is not a positive value less than %v ", rtt, maxRTT) + } + if rtt == 0 { + metrics.IncrCounter([]string{"serf", "coordinate", "zero-rtt"}, 1) + } + rttSeconds := c.latencyFilter(node, rtt.Seconds()) c.updateVivaldi(other, rttSeconds) c.updateAdjustment(other, rttSeconds) diff --git a/vendor/github.com/hashicorp/serf/serf/config.go b/vendor/github.com/hashicorp/serf/serf/config.go index 74f21ffbdf7..ad4f51b18a7 100644 --- a/vendor/github.com/hashicorp/serf/serf/config.go +++ b/vendor/github.com/hashicorp/serf/serf/config.go @@ -112,6 +112,10 @@ type Config struct { // node. FlapTimeout time.Duration + // QueueCheckInterval is the interval at which we check the message + // queue to apply the warning and max depth. + QueueCheckInterval time.Duration + // QueueDepthWarning is used to generate warning message if the // number of queued messages to broadcast exceeds this number. This // is to provide the user feedback if events are being triggered @@ -123,6 +127,12 @@ type Config struct { // prevent an unbounded growth of memory utilization MaxQueueDepth int + // MinQueueDepth, if >0 will enforce a lower limit for dropping messages + // and then the max will be max(MinQueueDepth, 2*SizeOfCluster). This + // defaults to 0 which disables this dynamic sizing feature. If this is + // >0 then MaxQueueDepth will be ignored. + MinQueueDepth int + // RecentIntentTimeout is used to determine how long we store recent // join and leave intents. This is used to guard against the case where // Serf broadcasts an intent that arrives before the Memberlist event. @@ -253,6 +263,7 @@ func DefaultConfig() *Config { RecentIntentTimeout: 5 * time.Minute, ReconnectInterval: 30 * time.Second, ReconnectTimeout: 24 * time.Hour, + QueueCheckInterval: 30 * time.Second, QueueDepthWarning: 128, MaxQueueDepth: 4096, TombstoneTimeout: 24 * time.Hour, diff --git a/vendor/github.com/hashicorp/serf/serf/ping_delegate.go b/vendor/github.com/hashicorp/serf/serf/ping_delegate.go index a4c44028eda..98032c5bea1 100644 --- a/vendor/github.com/hashicorp/serf/serf/ping_delegate.go +++ b/vendor/github.com/hashicorp/serf/serf/ping_delegate.go @@ -68,7 +68,8 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat before := p.serf.coordClient.GetCoordinate() after, err := p.serf.coordClient.Update(other.Name, &coord, rtt) if err != nil { - p.serf.logger.Printf("[ERR] serf: Rejected coordinate from %s: %v\n", + metrics.IncrCounter([]string{"serf", "coordinate", "rejected"}, 1) + p.serf.logger.Printf("[TRACE] serf: Rejected coordinate from %s: %v\n", other.Name, err) return } diff --git a/vendor/github.com/hashicorp/serf/serf/serf.go b/vendor/github.com/hashicorp/serf/serf/serf.go index 7256eafab1c..3e89fa11f8b 100644 --- a/vendor/github.com/hashicorp/serf/serf/serf.go +++ b/vendor/github.com/hashicorp/serf/serf/serf.go @@ -314,7 +314,6 @@ func Create(conf *Config) (*Serf, error) { conf.RejoinAfterLeave, serf.logger, &serf.clock, - serf.coordClient, conf.EventCh, serf.shutdownCh) if err != nil { @@ -1516,21 +1515,37 @@ func (s *Serf) reconnect() { s.memberlist.Join([]string{addr.String()}) } +// getQueueMax will get the maximum queue depth, which might be dynamic depending +// on how Serf is configured. +func (s *Serf) getQueueMax() int { + max := s.config.MaxQueueDepth + if s.config.MinQueueDepth > 0 { + s.memberLock.RLock() + max = 2 * len(s.members) + s.memberLock.RUnlock() + + if max < s.config.MinQueueDepth { + max = s.config.MinQueueDepth + } + } + return max +} + // checkQueueDepth periodically checks the size of a queue to see if // it is too large func (s *Serf) checkQueueDepth(name string, queue *memberlist.TransmitLimitedQueue) { for { select { - case <-time.After(time.Second): + case <-time.After(s.config.QueueCheckInterval): numq := queue.NumQueued() metrics.AddSample([]string{"serf", "queue", name}, float32(numq)) if numq >= s.config.QueueDepthWarning { s.logger.Printf("[WARN] serf: %s queue depth: %d", name, numq) } - if numq > s.config.MaxQueueDepth { + if max := s.getQueueMax(); numq > max { s.logger.Printf("[WARN] serf: %s queue depth (%d) exceeds limit (%d), dropping messages!", - name, numq, s.config.MaxQueueDepth) - queue.Prune(s.config.MaxQueueDepth) + name, numq, max) + queue.Prune(max) } case <-s.shutdownCh: return @@ -1654,11 +1669,18 @@ func (s *Serf) Stats() map[string]string { toString := func(v uint64) string { return strconv.FormatUint(v, 10) } + s.memberLock.RLock() + members := toString(uint64(len(s.members))) + failed := toString(uint64(len(s.failedMembers))) + left := toString(uint64(len(s.leftMembers))) + health_score := toString(uint64(s.memberlist.GetHealthScore())) + + s.memberLock.RUnlock() stats := map[string]string{ - "members": toString(uint64(len(s.members))), - "failed": toString(uint64(len(s.failedMembers))), - "left": toString(uint64(len(s.leftMembers))), - "health_score": toString(uint64(s.memberlist.GetHealthScore())), + "members": members, + "failed": failed, + "left": left, + "health_score": health_score, "member_time": toString(uint64(s.clock.Time())), "event_time": toString(uint64(s.eventClock.Time())), "query_time": toString(uint64(s.queryClock.Time())), diff --git a/vendor/github.com/hashicorp/serf/serf/snapshot.go b/vendor/github.com/hashicorp/serf/serf/snapshot.go index 8e15f3f31d0..9f5adebe625 100644 --- a/vendor/github.com/hashicorp/serf/serf/snapshot.go +++ b/vendor/github.com/hashicorp/serf/serf/snapshot.go @@ -2,7 +2,6 @@ package serf import ( "bufio" - "encoding/json" "fmt" "log" "math/rand" @@ -13,7 +12,6 @@ import ( "time" "github.com/armon/go-metrics" - "github.com/hashicorp/serf/coordinate" ) /* @@ -29,34 +27,32 @@ old events. const flushInterval = 500 * time.Millisecond const clockUpdateInterval = 500 * time.Millisecond -const coordinateUpdateInterval = 60 * time.Second const tmpExt = ".compact" const snapshotErrorRecoveryInterval = 30 * time.Second // Snapshotter is responsible for ingesting events and persisting // them to disk, and providing a recovery mechanism at start time. type Snapshotter struct { - aliveNodes map[string]string - clock *LamportClock - coordClient *coordinate.Client - fh *os.File - buffered *bufio.Writer - inCh <-chan Event - lastFlush time.Time - lastClock LamportTime - lastEventClock LamportTime - lastQueryClock LamportTime - leaveCh chan struct{} - leaving bool - logger *log.Logger - maxSize int64 - path string - offset int64 - outCh chan<- Event - rejoinAfterLeave bool - shutdownCh <-chan struct{} - waitCh chan struct{} - lastAttemptedCompaction time.Time + aliveNodes map[string]string + clock *LamportClock + fh *os.File + buffered *bufio.Writer + inCh <-chan Event + lastFlush time.Time + lastClock LamportTime + lastEventClock LamportTime + lastQueryClock LamportTime + leaveCh chan struct{} + leaving bool + logger *log.Logger + maxSize int64 + path string + offset int64 + outCh chan<- Event + rejoinAfterLeave bool + shutdownCh <-chan struct{} + waitCh chan struct{} + lastAttemptedCompaction time.Time } // PreviousNode is used to represent the previously known alive nodes @@ -80,7 +76,6 @@ func NewSnapshotter(path string, rejoinAfterLeave bool, logger *log.Logger, clock *LamportClock, - coordClient *coordinate.Client, outCh chan<- Event, shutdownCh <-chan struct{}) (chan<- Event, *Snapshotter, error) { inCh := make(chan Event, 1024) @@ -103,7 +98,6 @@ func NewSnapshotter(path string, snap := &Snapshotter{ aliveNodes: make(map[string]string), clock: clock, - coordClient: coordClient, fh: fh, buffered: bufio.NewWriter(fh), inCh: inCh, @@ -182,9 +176,6 @@ func (s *Snapshotter) stream() { clockTicker := time.NewTicker(clockUpdateInterval) defer clockTicker.Stop() - coordinateTicker := time.NewTicker(coordinateUpdateInterval) - defer coordinateTicker.Stop() - for { select { case <-s.leaveCh: @@ -226,9 +217,6 @@ func (s *Snapshotter) stream() { case <-clockTicker.C: s.updateClock() - case <-coordinateTicker.C: - s.updateCoordinate() - case <-s.shutdownCh: if err := s.buffered.Flush(); err != nil { s.logger.Printf("[ERR] serf: failed to flush snapshot: %v", err) @@ -275,20 +263,6 @@ func (s *Snapshotter) updateClock() { } } -// updateCoordinate is called periodically to write out the current local -// coordinate. It's safe to call this if coordinates aren't enabled (nil -// client) and it will be a no-op. -func (s *Snapshotter) updateCoordinate() { - if s.coordClient != nil { - encoded, err := json.Marshal(s.coordClient.GetCoordinate()) - if err != nil { - s.logger.Printf("[ERR] serf: Failed to encode coordinate: %v", err) - } else { - s.tryAppend(fmt.Sprintf("coordinate: %s\n", encoded)) - } - } -} - // processUserEvent is used to handle a single user event func (s *Snapshotter) processUserEvent(e UserEvent) { // Ignore old clocks @@ -404,30 +378,22 @@ func (s *Snapshotter) compact() error { } offset += int64(n) - // Write out the coordinate. - if s.coordClient != nil { - encoded, err := json.Marshal(s.coordClient.GetCoordinate()) - if err != nil { - fh.Close() - return err - } - - line = fmt.Sprintf("coordinate: %s\n", encoded) - n, err = buf.WriteString(line) - if err != nil { - fh.Close() - return err - } - offset += int64(n) - } - // Flush the new snapshot err = buf.Flush() - fh.Close() + if err != nil { return fmt.Errorf("failed to flush new snapshot: %v", err) } + err = fh.Sync() + + if err != nil { + fh.Close() + return fmt.Errorf("failed to fsync new snapshot: %v", err) + } + + fh.Close() + // We now need to swap the old snapshot file with the new snapshot. // Turns out, Windows won't let us rename the files if we have // open handles to them or if the destination already exists. This @@ -533,22 +499,7 @@ func (s *Snapshotter) replay() error { s.lastQueryClock = LamportTime(timeInt) } else if strings.HasPrefix(line, "coordinate: ") { - if s.coordClient == nil { - s.logger.Printf("[WARN] serf: Ignoring snapshot coordinates since they are disabled") - continue - } - - coordStr := strings.TrimPrefix(line, "coordinate: ") - var coord coordinate.Coordinate - err := json.Unmarshal([]byte(coordStr), &coord) - if err != nil { - s.logger.Printf("[WARN] serf: Failed to decode coordinate: %v", err) - continue - } - if err := s.coordClient.SetCoordinate(&coord); err != nil { - s.logger.Printf("[WARN] serf: Failed to set coordinate: %v", err) - continue - } + continue // Ignores any coordinate persistence from old snapshots, serf should re-converge } else if line == "leave" { // Ignore a leave if we plan on re-joining if s.rejoinAfterLeave { diff --git a/vendor/vendor.json b/vendor/vendor.json index 112d5d49ce2..c543ecfdcb3 100644 --- a/vendor/vendor.json +++ b/vendor/vendor.json @@ -124,14 +124,14 @@ {"path":"github.com/hashicorp/consul-template/template","checksumSHA1":"N9qobVzScLbTEnGE7MgFnnTbGBw=","revision":"26d029ad37335b3827a9fde5569b2c5e10dcac8f","revisionTime":"2017-10-31T14:25:17Z"}, {"path":"github.com/hashicorp/consul-template/version","checksumSHA1":"NB5+D4AuCNV9Bsqh3YFdPi4AJ6U=","revision":"26d029ad37335b3827a9fde5569b2c5e10dcac8f","revisionTime":"2017-10-31T14:25:17Z"}, {"path":"github.com/hashicorp/consul-template/watch","checksumSHA1":"b4+Y+02pY2Y5620F9ALzKg8Zmdw=","revision":"26d029ad37335b3827a9fde5569b2c5e10dcac8f","revisionTime":"2017-10-31T14:25:17Z"}, - {"path":"github.com/hashicorp/consul/agent/consul/autopilot","checksumSHA1":"+I7fgoQlrnTUGW5krqNLadWwtjg=","revision":"d1ede2c93dec7b4580e37ef41d24371abab9d9e9","revisionTime":"2018-02-21T18:19:48Z"}, - {"path":"github.com/hashicorp/consul/api","checksumSHA1":"XLfcIX2qpRr0o26aFMjCOzvw6jo=","revision":"51ea240df8476e02215d53fbfad5838bf0d44d21","revisionTime":"2017-10-16T16:22:40Z"}, - {"path":"github.com/hashicorp/consul/command/flags","checksumSHA1":"XTQIYV+DPUVRKpVp0+y/78bWH3I=","revision":"d08ab9fd199434e5220276356ecf9617cfec1eb2","revisionTime":"2017-12-18T20:26:35Z"}, - {"path":"github.com/hashicorp/consul/lib","checksumSHA1":"HGljdtVaqi/e3DgIHymLRLfPYhw=","revision":"bcafded4e60982d0b71e730f0b8564d73cb1d715","revisionTime":"2017-10-31T16:39:15Z"}, - {"path":"github.com/hashicorp/consul/lib/freeport","checksumSHA1":"hDJiPli3EEGJE4vAezMi05oOC7o=","revision":"bcafded4e60982d0b71e730f0b8564d73cb1d715","revisionTime":"2017-10-31T16:39:15Z"}, + {"path":"github.com/hashicorp/consul/agent/consul/autopilot","checksumSHA1":"+I7fgoQlrnTUGW5krqNLadWwtjg=","revision":"fb848fc48818f58690db09d14640513aa6bf3c02","revisionTime":"2018-04-13T17:05:42Z"}, + {"path":"github.com/hashicorp/consul/api","checksumSHA1":"7UvyPiYTxcB8xqRlULAT3X8+8zE=","revision":"fb848fc48818f58690db09d14640513aa6bf3c02","revisionTime":"2018-04-13T17:05:42Z"}, + {"path":"github.com/hashicorp/consul/command/flags","checksumSHA1":"soNN4xaHTbeXFgNkZ7cX0gbFXQk=","revision":"fb848fc48818f58690db09d14640513aa6bf3c02","revisionTime":"2018-04-13T17:05:42Z"}, + {"path":"github.com/hashicorp/consul/lib","checksumSHA1":"Nrh9BhiivRyJiuPzttstmq9xl/w=","revision":"fb848fc48818f58690db09d14640513aa6bf3c02","revisionTime":"2018-04-13T17:05:42Z"}, + {"path":"github.com/hashicorp/consul/lib/freeport","checksumSHA1":"E28E4zR1FN2v1Xiq4FUER7KVN9M=","revision":"fb848fc48818f58690db09d14640513aa6bf3c02","revisionTime":"2018-04-13T17:05:42Z"}, {"path":"github.com/hashicorp/consul/test/porter","checksumSHA1":"5XjgqE4UIfwXvkq5VssGNc7uPhQ=","revision":"ad9425ca6353b8afcfebd19130a8cf768f7eac30","revisionTime":"2017-10-21T00:05:25Z"}, - {"path":"github.com/hashicorp/consul/testutil","checksumSHA1":"+go9ycmyfF4b0W174gc7ej5mnE8=","revision":"350932161d6745836c1b2f39849bddb0f9fb52fd","revisionTime":"2017-10-20T23:49:17Z"}, - {"path":"github.com/hashicorp/consul/testutil/retry","checksumSHA1":"J8TTDc84MvAyXE/FrfgS+xc/b6s=","revision":"350932161d6745836c1b2f39849bddb0f9fb52fd","revisionTime":"2017-10-20T23:49:17Z"}, + {"path":"github.com/hashicorp/consul/testutil","checksumSHA1":"T4CeQD+QRsjf1BJ1n7FSojS5zDQ=","revision":"fb848fc48818f58690db09d14640513aa6bf3c02","revisionTime":"2018-04-13T17:05:42Z"}, + {"path":"github.com/hashicorp/consul/testutil/retry","checksumSHA1":"SCb2b91UYiB/23+SNDBlU5OZfFA=","revision":"fb848fc48818f58690db09d14640513aa6bf3c02","revisionTime":"2018-04-13T17:05:42Z"}, {"path":"github.com/hashicorp/errwrap","revision":"7554cd9344cec97297fa6649b055a8c98c2a1e55"}, {"path":"github.com/hashicorp/go-checkpoint","checksumSHA1":"D267IUMW2rcb+vNe3QU+xhfSrgY=","revision":"1545e56e46dec3bba264e41fde2c1e2aa65b5dd4","revisionTime":"2017-10-09T17:35:28Z"}, {"path":"github.com/hashicorp/go-cleanhttp","checksumSHA1":"6ihdHMkDfFx/rJ1A36com2F6bQk=","revision":"a45970658e51fea2c41445ff0f7e07106d007617","revisionTime":"2017-02-11T00:33:01Z"}, @@ -168,8 +168,8 @@ {"path":"github.com/hashicorp/net-rpc-msgpackrpc","revision":"a14192a58a694c123d8fe5481d4a4727d6ae82f3"}, {"path":"github.com/hashicorp/raft","checksumSHA1":"zkA9uvbj1BdlveyqXpVTh1N6ers=","revision":"077966dbc90f342107eb723ec52fdb0463ec789b","revisionTime":"2018-01-17T20:29:25Z","version":"master","versionExact":"master"}, {"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"}, - {"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"/oss17GO4hXGM7QnUdI3VzcAHzA=","revision":"bbeddf0b3ab3072a60525afbd6b6f47d33839eee","revisionTime":"2017-07-14T18:26:01Z"}, - {"path":"github.com/hashicorp/serf/serf","checksumSHA1":"pvLOzocYsZtxuJ9pREHRTxYnoa4=","revision":"bbeddf0b3ab3072a60525afbd6b6f47d33839eee","revisionTime":"2017-07-14T18:26:01Z"}, + {"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"0PeWsO2aI+2PgVYlYlDPKfzCLEQ=","revision":"fc4bdedf2366c64984e280c6eefc703ca7812585","revisionTime":"2018-04-11T17:01:37Z"}, + {"path":"github.com/hashicorp/serf/serf","checksumSHA1":"YzJaaeIJpxLfVDZYT1X2hpd8IK8=","revision":"fc4bdedf2366c64984e280c6eefc703ca7812585","revisionTime":"2018-04-11T17:01:37Z"}, {"path":"github.com/hashicorp/vault","checksumSHA1":"eGzvBRMFD6ZB3A6uO750np7Om/E=","revision":"182ba68a9589d4cef95234134aaa498a686e3de3","revisionTime":"2016-08-21T23:40:57Z"}, {"path":"github.com/hashicorp/vault/api","checksumSHA1":"mKN4rEIWyflT6aqJyjgu9m1tPXI=","revision":"3ddd3bd20cec0588788547aecd15e91461b9d546","revisionTime":"2018-04-03T21:11:47Z"}, {"path":"github.com/hashicorp/vault/helper/compressutil","checksumSHA1":"jHVLe8KMdEpb/ZALp0zu+tenADo=","revision":"3ddd3bd20cec0588788547aecd15e91461b9d546","revisionTime":"2018-04-03T21:11:47Z"}, From 905bef8f2d55df3927e38b4c79c2d30e69b51d98 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 3 May 2018 15:18:12 -0700 Subject: [PATCH 3/6] consul: make grpc checks more like http checks --- api/tasks.go | 4 +-- client/task_runner.go | 2 +- command/agent/consul/client.go | 2 +- command/agent/consul/unit_test.go | 43 +++++++++++++++++++++++++++--- command/agent/job_endpoint.go | 2 +- command/agent/job_endpoint_test.go | 4 +++ jobspec/parse.go | 2 +- jobspec/parse_test.go | 14 +++++----- jobspec/test-fixtures/basic.hcl | 2 +- nomad/structs/structs.go | 14 ++++------ nomad/structs/structs_test.go | 27 +++++++++++++++++++ 11 files changed, 90 insertions(+), 26 deletions(-) diff --git a/api/tasks.go b/api/tasks.go index 81ec6c00a5c..e6cb9863e8b 100644 --- a/api/tasks.go +++ b/api/tasks.go @@ -286,8 +286,8 @@ type ServiceCheck struct { Header map[string][]string Method string CheckRestart *CheckRestart `mapstructure:"check_restart"` - GRPC string - GRPCUseTLS bool `mapstructure:"grpc_use_tls"` + GRPCService string `mapstructure:"grpc_service"` + GRPCUseTLS bool `mapstructure:"grpc_use_tls"` } // The Service model represents a Consul service definition diff --git a/client/task_runner.go b/client/task_runner.go index 83166fed1db..ef893436a1f 100644 --- a/client/task_runner.go +++ b/client/task_runner.go @@ -1516,7 +1516,7 @@ func interpolateServices(taskEnv *env.TaskEnv, task *structs.Task) *structs.Task check.PortLabel = taskEnv.ReplaceEnv(check.PortLabel) check.InitialStatus = taskEnv.ReplaceEnv(check.InitialStatus) check.Method = taskEnv.ReplaceEnv(check.Method) - check.GRPC = taskEnv.ReplaceEnv(check.GRPC) + check.GRPCService = taskEnv.ReplaceEnv(check.GRPCService) if len(check.Header) > 0 { header := make(map[string][]string, len(check.Header)) for k, vs := range check.Header { diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go index 140d2d8d98d..660a2e876de 100644 --- a/command/agent/consul/client.go +++ b/command/agent/consul/client.go @@ -1104,7 +1104,7 @@ func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host chkReg.Interval = "" case structs.ServiceCheckGRPC: - chkReg.GRPC = check.GRPC + chkReg.GRPC = fmt.Sprintf("%s/%s", net.JoinHostPort(host, strconv.Itoa(port)), check.GRPCService) chkReg.GRPCUseTLS = check.GRPCUseTLS if check.TLSSkipVerify { chkReg.TLSSkipVerify = true diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go index 9f72ee2ea21..37157fcdcdd 100644 --- a/command/agent/consul/unit_test.go +++ b/command/agent/consul/unit_test.go @@ -1391,9 +1391,10 @@ func TestIsNomadService(t *testing.T) { } } -// TestCreateCheckReg asserts Nomad ServiceCheck structs are properly converted -// to Consul API AgentCheckRegistrations. -func TestCreateCheckReg(t *testing.T) { +// TestCreateCheckReg_HTTP asserts Nomad ServiceCheck structs are properly +// converted to Consul API AgentCheckRegistrations for HTTP checks. +func TestCreateCheckReg_HTTP(t *testing.T) { + t.Parallel() check := &structs.ServiceCheck{ Name: "name", Type: "http", @@ -1435,6 +1436,42 @@ func TestCreateCheckReg(t *testing.T) { } } +// TestCreateCheckReg_GRPC asserts Nomad ServiceCheck structs are properly +// converted to Consul API AgentCheckRegistrations for GRPC checks. +func TestCreateCheckReg_GRPC(t *testing.T) { + t.Parallel() + check := &structs.ServiceCheck{ + Name: "name", + Type: "grpc", + PortLabel: "label", + GRPCService: "foo.Bar", + GRPCUseTLS: true, + TLSSkipVerify: true, + Timeout: time.Second, + Interval: time.Minute, + } + + serviceID := "testService" + checkID := check.Hash(serviceID) + + expected := &api.AgentCheckRegistration{ + ID: checkID, + Name: "name", + ServiceID: serviceID, + AgentServiceCheck: api.AgentServiceCheck{ + Timeout: "1s", + Interval: "1m0s", + GRPC: "localhost:8080/foo.Bar", + GRPCUseTLS: true, + TLSSkipVerify: true, + }, + } + + actual, err := createCheckReg(serviceID, checkID, check, "localhost", 8080) + require.NoError(t, err) + require.Equal(t, expected, actual) +} + // TestGetAddress asserts Nomad uses the correct ip and port for services and // checks depending on port labels, driver networks, and address mode. func TestGetAddress(t *testing.T) { diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go index aa74a4233cd..510680002d2 100644 --- a/command/agent/job_endpoint.go +++ b/command/agent/job_endpoint.go @@ -764,7 +764,7 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) { TLSSkipVerify: check.TLSSkipVerify, Header: check.Header, Method: check.Method, - GRPC: check.GRPC, + GRPCService: check.GRPCService, GRPCUseTLS: check.GRPCUseTLS, } if check.CheckRestart != nil { diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go index 1c80d6fac8f..fb129dc918b 100644 --- a/command/agent/job_endpoint_test.go +++ b/command/agent/job_endpoint_test.go @@ -1272,6 +1272,8 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { Protocol: "http", PortLabel: "foo", AddressMode: "driver", + GRPCService: "foo.Bar", + GRPCUseTLS: true, Interval: 4 * time.Second, Timeout: 2 * time.Second, InitialStatus: "ok", @@ -1493,6 +1495,8 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) { Interval: 4 * time.Second, Timeout: 2 * time.Second, InitialStatus: "ok", + GRPCService: "foo.Bar", + GRPCUseTLS: true, CheckRestart: &structs.CheckRestart{ Limit: 3, Grace: 11 * time.Second, diff --git a/jobspec/parse.go b/jobspec/parse.go index dc5f61c1518..63157140e2e 100644 --- a/jobspec/parse.go +++ b/jobspec/parse.go @@ -1061,7 +1061,7 @@ func parseChecks(service *api.Service, checkObjs *ast.ObjectList) error { "method", "check_restart", "address_mode", - "grpc", + "grpc_service", "grpc_use_tls", } if err := helper.CheckHCLKeys(co.Val, valid); err != nil { diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go index 067397947a1..69875f7713d 100644 --- a/jobspec/parse_test.go +++ b/jobspec/parse_test.go @@ -135,13 +135,13 @@ func TestParse(t *testing.T) { PortLabel: "http", Checks: []api.ServiceCheck{ { - Name: "check-name", - Type: "tcp", - PortLabel: "admin", - Interval: 10 * time.Second, - Timeout: 2 * time.Second, - GRPC: "localhost:12345/foo", - GRPCUseTLS: true, + Name: "check-name", + Type: "tcp", + PortLabel: "admin", + Interval: 10 * time.Second, + Timeout: 2 * time.Second, + GRPCService: "foo.Bar", + GRPCUseTLS: true, CheckRestart: &api.CheckRestart{ Limit: 3, Grace: helper.TimeToPtr(10 * time.Second), diff --git a/jobspec/test-fixtures/basic.hcl b/jobspec/test-fixtures/basic.hcl index 624390bf160..11f40b883ad 100644 --- a/jobspec/test-fixtures/basic.hcl +++ b/jobspec/test-fixtures/basic.hcl @@ -107,7 +107,7 @@ job "binstore-storagelocker" { interval = "10s" timeout = "2s" port = "admin" - grpc = "localhost:12345/foo" + grpc_service = "foo.Bar" grpc_use_tls = true check_restart { diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go index 7f0036031cf..40a9a34d37e 100644 --- a/nomad/structs/structs.go +++ b/nomad/structs/structs.go @@ -3545,7 +3545,7 @@ type ServiceCheck struct { Method string // HTTP Method to use (GET by default) Header map[string][]string // HTTP Headers for Consul to set when making HTTP checks CheckRestart *CheckRestart // If and when a task should be restarted based on checks - GRPC string // Endpoint for GRPC checks + GRPCService string // Service for GRPC checks GRPCUseTLS bool // Whether or not to use TLS for GRPC checks } @@ -3587,6 +3587,7 @@ func (sc *ServiceCheck) Canonicalize(serviceName string) { func (sc *ServiceCheck) validate() error { // Validate Type switch strings.ToLower(sc.Type) { + case ServiceCheckGRPC: case ServiceCheckTCP: case ServiceCheckHTTP: if sc.Path == "" { @@ -3605,11 +3606,6 @@ func (sc *ServiceCheck) validate() error { return fmt.Errorf("script type must have a valid script path") } - case ServiceCheckGRPC: - if sc.GRPC == "" { - return fmt.Errorf("grpc type must have a valid endpoint") - } - default: return fmt.Errorf(`invalid type (%+q), must be one of "http", "tcp", or "script" type`, sc.Type) } @@ -3654,7 +3650,7 @@ func (sc *ServiceCheck) validate() error { // RequiresPort returns whether the service check requires the task has a port. func (sc *ServiceCheck) RequiresPort() bool { switch sc.Type { - case ServiceCheckHTTP, ServiceCheckTCP: + case ServiceCheckGRPC, ServiceCheckHTTP, ServiceCheckTCP: return true default: return false @@ -3706,8 +3702,8 @@ func (sc *ServiceCheck) Hash(serviceID string) string { } // Only include GRPC if set to maintain ID stability with Nomad <0.8.4 - if sc.GRPC != "" { - io.WriteString(h, sc.GRPC) + if sc.GRPCService != "" { + io.WriteString(h, sc.GRPCService) } if sc.GRPCUseTLS { io.WriteString(h, "true") diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go index b8e63a023a7..ec65fe755af 100644 --- a/nomad/structs/structs_test.go +++ b/nomad/structs/structs_test.go @@ -1267,7 +1267,34 @@ func TestTask_Validate_Service_Check_AddressMode(t *testing.T) { } } +func TestTask_Validate_Service_Check_GRPC(t *testing.T) { + t.Parallel() + // Bad (no port) + invalidGRPC := &ServiceCheck{ + Type: ServiceCheckGRPC, + Interval: time.Second, + Timeout: time.Second, + } + service := &Service{ + Name: "test", + Checks: []*ServiceCheck{invalidGRPC}, + } + + assert.Error(t, service.Validate()) + + // Good + service.Checks[0] = &ServiceCheck{ + Type: ServiceCheckGRPC, + Interval: time.Second, + Timeout: time.Second, + PortLabel: "some-port-label", + } + + assert.NoError(t, service.Validate()) +} + func TestTask_Validate_Service_Check_CheckRestart(t *testing.T) { + t.Parallel() invalidCheckRestart := &CheckRestart{ Limit: -1, Grace: -1, From ae41986fc24de8f9f5284ce45d0bae34c8854e41 Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 3 May 2018 16:45:07 -0700 Subject: [PATCH 4/6] docs: document grpc health check support --- CHANGELOG.md | 3 +- .../docs/job-specification/service.html.md | 45 ++++++++++++++++--- 2 files changed, 42 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4866b2fea18..e5a03067de0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,8 +2,9 @@ IMPROVEMENTS: * cli: Add node drain details to node status [[GH-4247](https://github.com/hashicorp/nomad/issues/4247)] - * command: add -short option to init command that emits a minimal + * command: Add -short option to init command that emits a minimal jobspec [[GH-4239](https://github.com/hashicorp/nomad/issues/4239)] + * discovery: Support Consul gRPC health checks. [[GH-4251](https://github.com/hashicorp/nomad/issues/4251)] ## 0.8.3 (April 27, 2018) diff --git a/website/source/docs/job-specification/service.html.md b/website/source/docs/job-specification/service.html.md index 2c465ca48b5..f83b30afbcd 100644 --- a/website/source/docs/job-specification/service.html.md +++ b/website/source/docs/job-specification/service.html.md @@ -72,7 +72,7 @@ does not automatically enable service discovery. - `check` ([Check](#check-parameters): nil) - Specifies a health check associated with the service. This can be specified multiple times to define multiple checks for the service. At this time, Nomad supports the - `script`1, `http` and `tcp` checks. + `grpc`, `http`, `script`1, and `tcp` checks. - `name` `(string: "--")` - Specifies the name this service will be advertised as in Consul. If not supplied, this will default to the @@ -160,6 +160,12 @@ scripts. parameter. To achieve the behavior of shell operators, specify the command as a shell, like `/bin/bash` and then use `args` to run the check. +- `grpc_service` `(string: )` - What service, if any, to specify in + the gRPC health check. gRPC health checks require Consul 1.0.5 or later. + +- `grpc_use_tls` `(bool: false)` - Use TLS to perform a gRPC health check. May + be used with `tls_skip_verify` to use TLS but skip certificate verification. + - `initial_status` `(string: )` - Specifies the originating status of the service. Valid options are the empty string, `passing`, `warning`, and `critical`. @@ -186,9 +192,9 @@ scripts. in the [`network`][network] stanza. If a port value was declared on the `service`, this will inherit from that value if not supplied. If supplied, this value takes precedence over the `service.port` value. This is useful for - services which operate on multiple ports. `http` and `tcp` checks require a - port while `script` checks do not. Checks will use the host IP and ports by - default. In Nomad 0.7.1 or later numeric ports may be used if + services which operate on multiple ports. `grpc`, `http`, and `tcp` checks + require a port while `script` checks do not. Checks will use the host IP and + ports by default. In Nomad 0.7.1 or later numeric ports may be used if `address_mode="driver"` is set on the check. - `protocol` `(string: "http")` - Specifies the protocol for the http-based @@ -199,7 +205,8 @@ scripts. "30s" or "1h". This must be greater than or equal to "1s" - `type` `(string: )` - This indicates the check types supported by - Nomad. Valid options are `script`, `http`, and `tcp`. + Nomad. Valid options are `grpc`, `http`, `script`, and `tcp`. gRPC health + checks require Consul 1.0.5 or later. - `tls_skip_verify` `(bool: false)` - Skip verifying TLS certificates for HTTPS checks. Requires Consul >= 0.7.2. @@ -355,6 +362,33 @@ service { } ``` +### gRPC Health Check + +gRPC health checks use the same host and port behavior as `http` and `tcp` +checks, but gRPC checks also have an optional gRPC service to health check. Not +all gRPC applications require a service to health check. gRPC health checks +require Consul 1.0.5 or later. + +```hcl +service { + check { + type = "grpc" + port = "rpc" + interval = "5s" + timeout = "2s" + grpc_service = "grpc.health.v1.Health" + grpc_use_tls = true + tls_skip_verify = true + } +} +``` + +This check would translate to having a Consul check registration with the +[GRPC][consul_grpc] parameter similar to `10.0.3.1:4567/grpc.health.v1.Health`. +Assuming the service's address is `10.0.3.1` and port is `4567`. See [Using +Driver Address Mode](#using-driver-address-mode) for details on address +selection. + ### Using Driver Address Mode The [Docker](/docs/drivers/docker.html#network_mode) and @@ -582,6 +616,7 @@ advertise and check directly since Nomad isn't managing any port assignments. system of a task for that driver. [check_restart_stanza]: /docs/job-specification/check_restart.html "check_restart stanza" +[consul_grpc]: https://www.consul.io/api/agent/check.html#grpc [service-discovery]: /docs/service-discovery/index.html "Nomad Service Discovery" [interpolation]: /docs/runtime/interpolation.html "Nomad Runtime Interpolation" [network]: /docs/job-specification/network.html "Nomad network Job Specification" From e8ad712fa285c49245a331fd066faca330d1463c Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Thu, 3 May 2018 16:54:42 -0700 Subject: [PATCH 5/6] tests: fix grpc fields in task diff --- nomad/structs/diff_test.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go index 9862cfdbf66..f3b3557f984 100644 --- a/nomad/structs/diff_test.go +++ b/nomad/structs/diff_test.go @@ -3557,6 +3557,12 @@ func TestTaskDiff(t *testing.T) { Old: "", New: "foo", }, + { + Type: DiffTypeAdded, + Name: "GRPCUseTLS", + Old: "", + New: "false", + }, { Type: DiffTypeAdded, Name: "Interval", @@ -3611,6 +3617,12 @@ func TestTaskDiff(t *testing.T) { Old: "foo", New: "", }, + { + Type: DiffTypeDeleted, + Name: "GRPCUseTLS", + Old: "false", + New: "", + }, { Type: DiffTypeDeleted, Name: "Interval", @@ -3767,6 +3779,18 @@ func TestTaskDiff(t *testing.T) { Old: "foo", New: "foo", }, + { + Type: DiffTypeNone, + Name: "GRPCService", + Old: "", + New: "", + }, + { + Type: DiffTypeNone, + Name: "GRPCUseTLS", + Old: "false", + New: "false", + }, { Type: DiffTypeEdited, Name: "InitialStatus", From 4197bc84b1bb24c03d28f64e9af1c3b4bc34005e Mon Sep 17 00:00:00 2001 From: Michael Schurter Date: Fri, 4 May 2018 11:14:20 -0700 Subject: [PATCH 6/6] docs: try to make grpc explanation less confusing --- website/source/docs/job-specification/service.html.md | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/website/source/docs/job-specification/service.html.md b/website/source/docs/job-specification/service.html.md index f83b30afbcd..7ebbe4c5b87 100644 --- a/website/source/docs/job-specification/service.html.md +++ b/website/source/docs/job-specification/service.html.md @@ -376,17 +376,16 @@ service { port = "rpc" interval = "5s" timeout = "2s" - grpc_service = "grpc.health.v1.Health" + grpc_service = "example.Service" grpc_use_tls = true tls_skip_verify = true } } ``` -This check would translate to having a Consul check registration with the -[GRPC][consul_grpc] parameter similar to `10.0.3.1:4567/grpc.health.v1.Health`. -Assuming the service's address is `10.0.3.1` and port is `4567`. See [Using -Driver Address Mode](#using-driver-address-mode) for details on address +In this example Consul would health check the `example.Service` service on the +`rpc` port defined in the task's [network resources][network] stanza. See +[Using Driver Address Mode](#using-driver-address-mode) for details on address selection. ### Using Driver Address Mode