diff --git a/CHANGELOG.md b/CHANGELOG.md
index 4866b2fea18..e5a03067de0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,8 +2,9 @@
IMPROVEMENTS:
* cli: Add node drain details to node status [[GH-4247](https://github.com/hashicorp/nomad/issues/4247)]
- * command: add -short option to init command that emits a minimal
+ * command: Add -short option to init command that emits a minimal
jobspec [[GH-4239](https://github.com/hashicorp/nomad/issues/4239)]
+ * discovery: Support Consul gRPC health checks. [[GH-4251](https://github.com/hashicorp/nomad/issues/4251)]
## 0.8.3 (April 27, 2018)
diff --git a/api/tasks.go b/api/tasks.go
index 4895312efcf..e6cb9863e8b 100644
--- a/api/tasks.go
+++ b/api/tasks.go
@@ -286,6 +286,8 @@ type ServiceCheck struct {
Header map[string][]string
Method string
CheckRestart *CheckRestart `mapstructure:"check_restart"`
+ GRPCService string `mapstructure:"grpc_service"`
+ GRPCUseTLS bool `mapstructure:"grpc_use_tls"`
}
// The Service model represents a Consul service definition
diff --git a/client/task_runner.go b/client/task_runner.go
index 2a0303576ae..ef893436a1f 100644
--- a/client/task_runner.go
+++ b/client/task_runner.go
@@ -1516,6 +1516,7 @@ func interpolateServices(taskEnv *env.TaskEnv, task *structs.Task) *structs.Task
check.PortLabel = taskEnv.ReplaceEnv(check.PortLabel)
check.InitialStatus = taskEnv.ReplaceEnv(check.InitialStatus)
check.Method = taskEnv.ReplaceEnv(check.Method)
+ check.GRPCService = taskEnv.ReplaceEnv(check.GRPCService)
if len(check.Header) > 0 {
header := make(map[string][]string, len(check.Header))
for k, vs := range check.Header {
diff --git a/command/agent/consul/client.go b/command/agent/consul/client.go
index cc0a26d411e..660a2e876de 100644
--- a/command/agent/consul/client.go
+++ b/command/agent/consul/client.go
@@ -1094,12 +1094,22 @@ func createCheckReg(serviceID, checkID string, check *structs.ServiceCheck, host
chkReg.HTTP = url.String()
chkReg.Method = check.Method
chkReg.Header = check.Header
+
case structs.ServiceCheckTCP:
chkReg.TCP = net.JoinHostPort(host, strconv.Itoa(port))
+
case structs.ServiceCheckScript:
chkReg.TTL = (check.Interval + ttlCheckBuffer).String()
// As of Consul 1.0.0 setting TTL and Interval is a 400
chkReg.Interval = ""
+
+ case structs.ServiceCheckGRPC:
+ chkReg.GRPC = fmt.Sprintf("%s/%s", net.JoinHostPort(host, strconv.Itoa(port)), check.GRPCService)
+ chkReg.GRPCUseTLS = check.GRPCUseTLS
+ if check.TLSSkipVerify {
+ chkReg.TLSSkipVerify = true
+ }
+
default:
return nil, fmt.Errorf("check type %+q not valid", check.Type)
}
diff --git a/command/agent/consul/unit_test.go b/command/agent/consul/unit_test.go
index 9f72ee2ea21..37157fcdcdd 100644
--- a/command/agent/consul/unit_test.go
+++ b/command/agent/consul/unit_test.go
@@ -1391,9 +1391,10 @@ func TestIsNomadService(t *testing.T) {
}
}
-// TestCreateCheckReg asserts Nomad ServiceCheck structs are properly converted
-// to Consul API AgentCheckRegistrations.
-func TestCreateCheckReg(t *testing.T) {
+// TestCreateCheckReg_HTTP asserts Nomad ServiceCheck structs are properly
+// converted to Consul API AgentCheckRegistrations for HTTP checks.
+func TestCreateCheckReg_HTTP(t *testing.T) {
+ t.Parallel()
check := &structs.ServiceCheck{
Name: "name",
Type: "http",
@@ -1435,6 +1436,42 @@ func TestCreateCheckReg(t *testing.T) {
}
}
+// TestCreateCheckReg_GRPC asserts Nomad ServiceCheck structs are properly
+// converted to Consul API AgentCheckRegistrations for GRPC checks.
+func TestCreateCheckReg_GRPC(t *testing.T) {
+ t.Parallel()
+ check := &structs.ServiceCheck{
+ Name: "name",
+ Type: "grpc",
+ PortLabel: "label",
+ GRPCService: "foo.Bar",
+ GRPCUseTLS: true,
+ TLSSkipVerify: true,
+ Timeout: time.Second,
+ Interval: time.Minute,
+ }
+
+ serviceID := "testService"
+ checkID := check.Hash(serviceID)
+
+ expected := &api.AgentCheckRegistration{
+ ID: checkID,
+ Name: "name",
+ ServiceID: serviceID,
+ AgentServiceCheck: api.AgentServiceCheck{
+ Timeout: "1s",
+ Interval: "1m0s",
+ GRPC: "localhost:8080/foo.Bar",
+ GRPCUseTLS: true,
+ TLSSkipVerify: true,
+ },
+ }
+
+ actual, err := createCheckReg(serviceID, checkID, check, "localhost", 8080)
+ require.NoError(t, err)
+ require.Equal(t, expected, actual)
+}
+
// TestGetAddress asserts Nomad uses the correct ip and port for services and
// checks depending on port labels, driver networks, and address mode.
func TestGetAddress(t *testing.T) {
diff --git a/command/agent/job_endpoint.go b/command/agent/job_endpoint.go
index 2a0a92b1763..510680002d2 100644
--- a/command/agent/job_endpoint.go
+++ b/command/agent/job_endpoint.go
@@ -764,6 +764,8 @@ func ApiTaskToStructsTask(apiTask *api.Task, structsTask *structs.Task) {
TLSSkipVerify: check.TLSSkipVerify,
Header: check.Header,
Method: check.Method,
+ GRPCService: check.GRPCService,
+ GRPCUseTLS: check.GRPCUseTLS,
}
if check.CheckRestart != nil {
structsTask.Services[i].Checks[j].CheckRestart = &structs.CheckRestart{
diff --git a/command/agent/job_endpoint_test.go b/command/agent/job_endpoint_test.go
index 1c80d6fac8f..fb129dc918b 100644
--- a/command/agent/job_endpoint_test.go
+++ b/command/agent/job_endpoint_test.go
@@ -1272,6 +1272,8 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Protocol: "http",
PortLabel: "foo",
AddressMode: "driver",
+ GRPCService: "foo.Bar",
+ GRPCUseTLS: true,
Interval: 4 * time.Second,
Timeout: 2 * time.Second,
InitialStatus: "ok",
@@ -1493,6 +1495,8 @@ func TestJobs_ApiJobToStructsJob(t *testing.T) {
Interval: 4 * time.Second,
Timeout: 2 * time.Second,
InitialStatus: "ok",
+ GRPCService: "foo.Bar",
+ GRPCUseTLS: true,
CheckRestart: &structs.CheckRestart{
Limit: 3,
Grace: 11 * time.Second,
diff --git a/jobspec/parse.go b/jobspec/parse.go
index e56161cd4c4..63157140e2e 100644
--- a/jobspec/parse.go
+++ b/jobspec/parse.go
@@ -1061,6 +1061,8 @@ func parseChecks(service *api.Service, checkObjs *ast.ObjectList) error {
"method",
"check_restart",
"address_mode",
+ "grpc_service",
+ "grpc_use_tls",
}
if err := helper.CheckHCLKeys(co.Val, valid); err != nil {
return multierror.Prefix(err, "check ->")
diff --git a/jobspec/parse_test.go b/jobspec/parse_test.go
index 6a705bef6c0..69875f7713d 100644
--- a/jobspec/parse_test.go
+++ b/jobspec/parse_test.go
@@ -135,11 +135,13 @@ func TestParse(t *testing.T) {
PortLabel: "http",
Checks: []api.ServiceCheck{
{
- Name: "check-name",
- Type: "tcp",
- PortLabel: "admin",
- Interval: 10 * time.Second,
- Timeout: 2 * time.Second,
+ Name: "check-name",
+ Type: "tcp",
+ PortLabel: "admin",
+ Interval: 10 * time.Second,
+ Timeout: 2 * time.Second,
+ GRPCService: "foo.Bar",
+ GRPCUseTLS: true,
CheckRestart: &api.CheckRestart{
Limit: 3,
Grace: helper.TimeToPtr(10 * time.Second),
diff --git a/jobspec/test-fixtures/basic.hcl b/jobspec/test-fixtures/basic.hcl
index 2b3f973aa9c..11f40b883ad 100644
--- a/jobspec/test-fixtures/basic.hcl
+++ b/jobspec/test-fixtures/basic.hcl
@@ -102,11 +102,13 @@ job "binstore-storagelocker" {
port = "http"
check {
- name = "check-name"
- type = "tcp"
- interval = "10s"
- timeout = "2s"
- port = "admin"
+ name = "check-name"
+ type = "tcp"
+ interval = "10s"
+ timeout = "2s"
+ port = "admin"
+ grpc_service = "foo.Bar"
+ grpc_use_tls = true
check_restart {
limit = 3
diff --git a/nomad/structs/diff_test.go b/nomad/structs/diff_test.go
index 9862cfdbf66..f3b3557f984 100644
--- a/nomad/structs/diff_test.go
+++ b/nomad/structs/diff_test.go
@@ -3557,6 +3557,12 @@ func TestTaskDiff(t *testing.T) {
Old: "",
New: "foo",
},
+ {
+ Type: DiffTypeAdded,
+ Name: "GRPCUseTLS",
+ Old: "",
+ New: "false",
+ },
{
Type: DiffTypeAdded,
Name: "Interval",
@@ -3611,6 +3617,12 @@ func TestTaskDiff(t *testing.T) {
Old: "foo",
New: "",
},
+ {
+ Type: DiffTypeDeleted,
+ Name: "GRPCUseTLS",
+ Old: "false",
+ New: "",
+ },
{
Type: DiffTypeDeleted,
Name: "Interval",
@@ -3767,6 +3779,18 @@ func TestTaskDiff(t *testing.T) {
Old: "foo",
New: "foo",
},
+ {
+ Type: DiffTypeNone,
+ Name: "GRPCService",
+ Old: "",
+ New: "",
+ },
+ {
+ Type: DiffTypeNone,
+ Name: "GRPCUseTLS",
+ Old: "false",
+ New: "false",
+ },
{
Type: DiffTypeEdited,
Name: "InitialStatus",
diff --git a/nomad/structs/structs.go b/nomad/structs/structs.go
index 601942951ae..40a9a34d37e 100644
--- a/nomad/structs/structs.go
+++ b/nomad/structs/structs.go
@@ -3515,6 +3515,7 @@ const (
ServiceCheckHTTP = "http"
ServiceCheckTCP = "tcp"
ServiceCheckScript = "script"
+ ServiceCheckGRPC = "grpc"
// minCheckInterval is the minimum check interval permitted. Consul
// currently has its MinInterval set to 1s. Mirror that here for
@@ -3544,6 +3545,8 @@ type ServiceCheck struct {
Method string // HTTP Method to use (GET by default)
Header map[string][]string // HTTP Headers for Consul to set when making HTTP checks
CheckRestart *CheckRestart // If and when a task should be restarted based on checks
+ GRPCService string // Service for GRPC checks
+ GRPCUseTLS bool // Whether or not to use TLS for GRPC checks
}
func (sc *ServiceCheck) Copy() *ServiceCheck {
@@ -3584,6 +3587,7 @@ func (sc *ServiceCheck) Canonicalize(serviceName string) {
func (sc *ServiceCheck) validate() error {
// Validate Type
switch strings.ToLower(sc.Type) {
+ case ServiceCheckGRPC:
case ServiceCheckTCP:
case ServiceCheckHTTP:
if sc.Path == "" {
@@ -3601,6 +3605,7 @@ func (sc *ServiceCheck) validate() error {
if sc.Command == "" {
return fmt.Errorf("script type must have a valid script path")
}
+
default:
return fmt.Errorf(`invalid type (%+q), must be one of "http", "tcp", or "script" type`, sc.Type)
}
@@ -3645,7 +3650,7 @@ func (sc *ServiceCheck) validate() error {
// RequiresPort returns whether the service check requires the task has a port.
func (sc *ServiceCheck) RequiresPort() bool {
switch sc.Type {
- case ServiceCheckHTTP, ServiceCheckTCP:
+ case ServiceCheckGRPC, ServiceCheckHTTP, ServiceCheckTCP:
return true
default:
return false
@@ -3696,6 +3701,14 @@ func (sc *ServiceCheck) Hash(serviceID string) string {
io.WriteString(h, sc.AddressMode)
}
+ // Only include GRPC if set to maintain ID stability with Nomad <0.8.4
+ if sc.GRPCService != "" {
+ io.WriteString(h, sc.GRPCService)
+ }
+ if sc.GRPCUseTLS {
+ io.WriteString(h, "true")
+ }
+
return fmt.Sprintf("%x", h.Sum(nil))
}
diff --git a/nomad/structs/structs_test.go b/nomad/structs/structs_test.go
index b8e63a023a7..ec65fe755af 100644
--- a/nomad/structs/structs_test.go
+++ b/nomad/structs/structs_test.go
@@ -1267,7 +1267,34 @@ func TestTask_Validate_Service_Check_AddressMode(t *testing.T) {
}
}
+func TestTask_Validate_Service_Check_GRPC(t *testing.T) {
+ t.Parallel()
+ // Bad (no port)
+ invalidGRPC := &ServiceCheck{
+ Type: ServiceCheckGRPC,
+ Interval: time.Second,
+ Timeout: time.Second,
+ }
+ service := &Service{
+ Name: "test",
+ Checks: []*ServiceCheck{invalidGRPC},
+ }
+
+ assert.Error(t, service.Validate())
+
+ // Good
+ service.Checks[0] = &ServiceCheck{
+ Type: ServiceCheckGRPC,
+ Interval: time.Second,
+ Timeout: time.Second,
+ PortLabel: "some-port-label",
+ }
+
+ assert.NoError(t, service.Validate())
+}
+
func TestTask_Validate_Service_Check_CheckRestart(t *testing.T) {
+ t.Parallel()
invalidCheckRestart := &CheckRestart{
Limit: -1,
Grace: -1,
diff --git a/vendor/github.com/hashicorp/consul/api/acl.go b/vendor/github.com/hashicorp/consul/api/acl.go
index 6ea0a752e58..8ec9aa58557 100644
--- a/vendor/github.com/hashicorp/consul/api/acl.go
+++ b/vendor/github.com/hashicorp/consul/api/acl.go
@@ -5,7 +5,7 @@ import (
)
const (
- // ACLCLientType is the client type token
+ // ACLClientType is the client type token
ACLClientType = "client"
// ACLManagementType is the management type token
diff --git a/vendor/github.com/hashicorp/consul/api/agent.go b/vendor/github.com/hashicorp/consul/api/agent.go
index ac57415c151..b42baed41d3 100644
--- a/vendor/github.com/hashicorp/consul/api/agent.go
+++ b/vendor/github.com/hashicorp/consul/api/agent.go
@@ -15,6 +15,7 @@ type AgentCheck struct {
Output string
ServiceID string
ServiceName string
+ Definition HealthCheckDefinition
}
// AgentService represents a service known to the agent
@@ -59,12 +60,13 @@ type MembersOpts struct {
// AgentServiceRegistration is used to register a new service
type AgentServiceRegistration struct {
- ID string `json:",omitempty"`
- Name string `json:",omitempty"`
- Tags []string `json:",omitempty"`
- Port int `json:",omitempty"`
- Address string `json:",omitempty"`
- EnableTagOverride bool `json:",omitempty"`
+ ID string `json:",omitempty"`
+ Name string `json:",omitempty"`
+ Tags []string `json:",omitempty"`
+ Port int `json:",omitempty"`
+ Address string `json:",omitempty"`
+ EnableTagOverride bool `json:",omitempty"`
+ Meta map[string]string `json:",omitempty"`
Check *AgentServiceCheck
Checks AgentServiceChecks
}
@@ -80,7 +82,10 @@ type AgentCheckRegistration struct {
// AgentServiceCheck is used to define a node or service level check
type AgentServiceCheck struct {
- Script string `json:",omitempty"`
+ CheckID string `json:",omitempty"`
+ Name string `json:",omitempty"`
+ Args []string `json:"ScriptArgs,omitempty"`
+ Script string `json:",omitempty"` // Deprecated, use Args.
DockerContainerID string `json:",omitempty"`
Shell string `json:",omitempty"` // Only supported for Docker.
Interval string `json:",omitempty"`
@@ -93,6 +98,8 @@ type AgentServiceCheck struct {
Status string `json:",omitempty"`
Notes string `json:",omitempty"`
TLSSkipVerify bool `json:",omitempty"`
+ GRPC string `json:",omitempty"`
+ GRPCUseTLS bool `json:",omitempty"`
// In Consul 0.7 and later, checks that are associated with a service
// may also contain this optional DeregisterCriticalServiceAfter field,
@@ -580,36 +587,36 @@ func (a *Agent) Monitor(loglevel string, stopCh <-chan struct{}, q *QueryOptions
// UpdateACLToken updates the agent's "acl_token". See updateToken for more
// details.
-func (c *Agent) UpdateACLToken(token string, q *WriteOptions) (*WriteMeta, error) {
- return c.updateToken("acl_token", token, q)
+func (a *Agent) UpdateACLToken(token string, q *WriteOptions) (*WriteMeta, error) {
+ return a.updateToken("acl_token", token, q)
}
// UpdateACLAgentToken updates the agent's "acl_agent_token". See updateToken
// for more details.
-func (c *Agent) UpdateACLAgentToken(token string, q *WriteOptions) (*WriteMeta, error) {
- return c.updateToken("acl_agent_token", token, q)
+func (a *Agent) UpdateACLAgentToken(token string, q *WriteOptions) (*WriteMeta, error) {
+ return a.updateToken("acl_agent_token", token, q)
}
// UpdateACLAgentMasterToken updates the agent's "acl_agent_master_token". See
// updateToken for more details.
-func (c *Agent) UpdateACLAgentMasterToken(token string, q *WriteOptions) (*WriteMeta, error) {
- return c.updateToken("acl_agent_master_token", token, q)
+func (a *Agent) UpdateACLAgentMasterToken(token string, q *WriteOptions) (*WriteMeta, error) {
+ return a.updateToken("acl_agent_master_token", token, q)
}
// UpdateACLReplicationToken updates the agent's "acl_replication_token". See
// updateToken for more details.
-func (c *Agent) UpdateACLReplicationToken(token string, q *WriteOptions) (*WriteMeta, error) {
- return c.updateToken("acl_replication_token", token, q)
+func (a *Agent) UpdateACLReplicationToken(token string, q *WriteOptions) (*WriteMeta, error) {
+ return a.updateToken("acl_replication_token", token, q)
}
// updateToken can be used to update an agent's ACL token after the agent has
// started. The tokens are not persisted, so will need to be updated again if
// the agent is restarted.
-func (c *Agent) updateToken(target, token string, q *WriteOptions) (*WriteMeta, error) {
- r := c.c.newRequest("PUT", fmt.Sprintf("/v1/agent/token/%s", target))
+func (a *Agent) updateToken(target, token string, q *WriteOptions) (*WriteMeta, error) {
+ r := a.c.newRequest("PUT", fmt.Sprintf("/v1/agent/token/%s", target))
r.setWriteOptions(q)
r.obj = &AgentToken{Token: token}
- rtt, resp, err := requireOK(c.c.doRequest(r))
+ rtt, resp, err := requireOK(a.c.doRequest(r))
if err != nil {
return nil, err
}
diff --git a/vendor/github.com/hashicorp/consul/api/api.go b/vendor/github.com/hashicorp/consul/api/api.go
index 87cd3c5a377..1cdc21e3317 100644
--- a/vendor/github.com/hashicorp/consul/api/api.go
+++ b/vendor/github.com/hashicorp/consul/api/api.go
@@ -101,7 +101,7 @@ type QueryOptions struct {
// be provided for filtering.
NodeMeta map[string]string
- // RelayFactor is used in keyring operations to cause reponses to be
+ // RelayFactor is used in keyring operations to cause responses to be
// relayed back to the sender through N other random nodes. Must be
// a value from 0 to 5 (inclusive).
RelayFactor uint8
@@ -137,7 +137,7 @@ type WriteOptions struct {
// which overrides the agent's default token.
Token string
- // RelayFactor is used in keyring operations to cause reponses to be
+ // RelayFactor is used in keyring operations to cause responses to be
// relayed back to the sender through N other random nodes. Must be
// a value from 0 to 5 (inclusive).
RelayFactor uint8
@@ -377,12 +377,14 @@ func SetupTLSConfig(tlsConfig *TLSConfig) (*tls.Config, error) {
tlsClientConfig.Certificates = []tls.Certificate{tlsCert}
}
- rootConfig := &rootcerts.Config{
- CAFile: tlsConfig.CAFile,
- CAPath: tlsConfig.CAPath,
- }
- if err := rootcerts.ConfigureTLS(tlsClientConfig, rootConfig); err != nil {
- return nil, err
+ if tlsConfig.CAFile != "" || tlsConfig.CAPath != "" {
+ rootConfig := &rootcerts.Config{
+ CAFile: tlsConfig.CAFile,
+ CAPath: tlsConfig.CAPath,
+ }
+ if err := rootcerts.ConfigureTLS(tlsClientConfig, rootConfig); err != nil {
+ return nil, err
+ }
}
return tlsClientConfig, nil
@@ -477,6 +479,14 @@ func NewHttpClient(transport *http.Transport, tlsConf TLSConfig) (*http.Client,
Transport: transport,
}
+ // TODO (slackpad) - Once we get some run time on the HTTP/2 support we
+ // should turn it on by default if TLS is enabled. We would basically
+ // just need to call http2.ConfigureTransport(transport) here. We also
+ // don't want to introduce another external dependency on
+ // golang.org/x/net/http2 at this time. For a complete recipe for how
+ // to enable HTTP/2 support on a transport suitable for the API client
+ // library see agent/http_test.go:TestHTTPServer_H2.
+
if transport.TLSClientConfig == nil {
tlsClientConfig, err := SetupTLSConfig(&tlsConf)
@@ -623,9 +633,9 @@ func (r *request) toHTTP() (*http.Request, error) {
}
if r.ctx != nil {
return req.WithContext(r.ctx), nil
- } else {
- return req, nil
}
+
+ return req, nil
}
// newRequest is used to create a new request
@@ -661,7 +671,7 @@ func (c *Client) doRequest(r *request) (time.Duration, *http.Response, error) {
}
start := time.Now()
resp, err := c.config.HttpClient.Do(req)
- diff := time.Now().Sub(start)
+ diff := time.Since(start)
return diff, resp, err
}
diff --git a/vendor/github.com/hashicorp/consul/api/catalog.go b/vendor/github.com/hashicorp/consul/api/catalog.go
index babfc9a1df4..80ce1bc8156 100644
--- a/vendor/github.com/hashicorp/consul/api/catalog.go
+++ b/vendor/github.com/hashicorp/consul/api/catalog.go
@@ -22,6 +22,7 @@ type CatalogService struct {
ServiceName string
ServiceAddress string
ServiceTags []string
+ ServiceMeta map[string]string
ServicePort int
ServiceEnableTagOverride bool
CreateIndex uint64
@@ -42,6 +43,7 @@ type CatalogRegistration struct {
Datacenter string
Service *AgentService
Check *AgentCheck
+ SkipNodeUpdate bool
}
type CatalogDeregistration struct {
diff --git a/vendor/github.com/hashicorp/consul/api/coordinate.go b/vendor/github.com/hashicorp/consul/api/coordinate.go
index 90214e392ce..53318f11dd5 100644
--- a/vendor/github.com/hashicorp/consul/api/coordinate.go
+++ b/vendor/github.com/hashicorp/consul/api/coordinate.go
@@ -66,3 +66,41 @@ func (c *Coordinate) Nodes(q *QueryOptions) ([]*CoordinateEntry, *QueryMeta, err
}
return out, qm, nil
}
+
+// Update inserts or updates the LAN coordinate of a node.
+func (c *Coordinate) Update(coord *CoordinateEntry, q *WriteOptions) (*WriteMeta, error) {
+ r := c.c.newRequest("PUT", "/v1/coordinate/update")
+ r.setWriteOptions(q)
+ r.obj = coord
+ rtt, resp, err := requireOK(c.c.doRequest(r))
+ if err != nil {
+ return nil, err
+ }
+ defer resp.Body.Close()
+
+ wm := &WriteMeta{}
+ wm.RequestTime = rtt
+
+ return wm, nil
+}
+
+// Node is used to return the coordinates of a single in the LAN pool.
+func (c *Coordinate) Node(node string, q *QueryOptions) ([]*CoordinateEntry, *QueryMeta, error) {
+ r := c.c.newRequest("GET", "/v1/coordinate/node/"+node)
+ r.setQueryOptions(q)
+ rtt, resp, err := requireOK(c.c.doRequest(r))
+ if err != nil {
+ return nil, nil, err
+ }
+ defer resp.Body.Close()
+
+ qm := &QueryMeta{}
+ parseQueryMeta(resp, qm)
+ qm.RequestTime = rtt
+
+ var out []*CoordinateEntry
+ if err := decodeBody(resp, &out); err != nil {
+ return nil, nil, err
+ }
+ return out, qm, nil
+}
diff --git a/vendor/github.com/hashicorp/consul/api/health.go b/vendor/github.com/hashicorp/consul/api/health.go
index 38c105fdb93..53f3de4f795 100644
--- a/vendor/github.com/hashicorp/consul/api/health.go
+++ b/vendor/github.com/hashicorp/consul/api/health.go
@@ -34,6 +34,21 @@ type HealthCheck struct {
ServiceID string
ServiceName string
ServiceTags []string
+
+ Definition HealthCheckDefinition
+}
+
+// HealthCheckDefinition is used to store the details about
+// a health check's execution.
+type HealthCheckDefinition struct {
+ HTTP string
+ Header map[string][]string
+ Method string
+ TLSSkipVerify bool
+ TCP string
+ Interval ReadableDuration
+ Timeout ReadableDuration
+ DeregisterCriticalServiceAfter ReadableDuration
}
// HealthChecks is a collection of HealthCheck structs.
diff --git a/vendor/github.com/hashicorp/consul/api/kv.go b/vendor/github.com/hashicorp/consul/api/kv.go
index 5990d3d13a5..97f51568559 100644
--- a/vendor/github.com/hashicorp/consul/api/kv.go
+++ b/vendor/github.com/hashicorp/consul/api/kv.go
@@ -252,7 +252,7 @@ func (k *KV) put(key string, params map[string]string, body []byte, q *WriteOpti
if _, err := io.Copy(&buf, resp.Body); err != nil {
return false, nil, fmt.Errorf("Failed to read response: %v", err)
}
- res := strings.Contains(string(buf.Bytes()), "true")
+ res := strings.Contains(buf.String(), "true")
return res, qm, nil
}
@@ -296,7 +296,7 @@ func (k *KV) deleteInternal(key string, params map[string]string, q *WriteOption
if _, err := io.Copy(&buf, resp.Body); err != nil {
return false, nil, fmt.Errorf("Failed to read response: %v", err)
}
- res := strings.Contains(string(buf.Bytes()), "true")
+ res := strings.Contains(buf.String(), "true")
return res, qm, nil
}
diff --git a/vendor/github.com/hashicorp/consul/api/lock.go b/vendor/github.com/hashicorp/consul/api/lock.go
index d3187113f72..41f72e7d23a 100644
--- a/vendor/github.com/hashicorp/consul/api/lock.go
+++ b/vendor/github.com/hashicorp/consul/api/lock.go
@@ -180,7 +180,7 @@ WAIT:
// Handle the one-shot mode.
if l.opts.LockTryOnce && attempts > 0 {
- elapsed := time.Now().Sub(start)
+ elapsed := time.Since(start)
if elapsed > qOpts.WaitTime {
return nil, nil
}
diff --git a/vendor/github.com/hashicorp/consul/api/operator_autopilot.go b/vendor/github.com/hashicorp/consul/api/operator_autopilot.go
index 0fa9d160403..b179406dc12 100644
--- a/vendor/github.com/hashicorp/consul/api/operator_autopilot.go
+++ b/vendor/github.com/hashicorp/consul/api/operator_autopilot.go
@@ -196,7 +196,7 @@ func (op *Operator) AutopilotCASConfiguration(conf *AutopilotConfiguration, q *W
if _, err := io.Copy(&buf, resp.Body); err != nil {
return false, fmt.Errorf("Failed to read response: %v", err)
}
- res := strings.Contains(string(buf.Bytes()), "true")
+ res := strings.Contains(buf.String(), "true")
return res, nil
}
diff --git a/vendor/github.com/hashicorp/consul/api/prepared_query.go b/vendor/github.com/hashicorp/consul/api/prepared_query.go
index ff210de3f00..d322dd86794 100644
--- a/vendor/github.com/hashicorp/consul/api/prepared_query.go
+++ b/vendor/github.com/hashicorp/consul/api/prepared_query.go
@@ -34,6 +34,12 @@ type ServiceQuery struct {
// local datacenter.
Failover QueryDatacenterOptions
+ // IgnoreCheckIDs is an optional list of health check IDs to ignore when
+ // considering which nodes are healthy. It is useful as an emergency measure
+ // to temporarily override some health check that is producing false negatives
+ // for example.
+ IgnoreCheckIDs []string
+
// If OnlyPassing is true then we will only include nodes with passing
// health checks (critical AND warning checks will cause a node to be
// discarded)
@@ -61,7 +67,7 @@ type QueryTemplate struct {
Regexp string
}
-// PrepatedQueryDefinition defines a complete prepared query.
+// PreparedQueryDefinition defines a complete prepared query.
type PreparedQueryDefinition struct {
// ID is this UUID-based ID for the query, always generated by Consul.
ID string
diff --git a/vendor/github.com/hashicorp/consul/api/semaphore.go b/vendor/github.com/hashicorp/consul/api/semaphore.go
index d848dfd0b19..d0c57417788 100644
--- a/vendor/github.com/hashicorp/consul/api/semaphore.go
+++ b/vendor/github.com/hashicorp/consul/api/semaphore.go
@@ -198,7 +198,7 @@ WAIT:
// Handle the one-shot mode.
if s.opts.SemaphoreTryOnce && attempts > 0 {
- elapsed := time.Now().Sub(start)
+ elapsed := time.Since(start)
if elapsed > qOpts.WaitTime {
return nil, nil
}
diff --git a/vendor/github.com/hashicorp/consul/command/flags/http.go b/vendor/github.com/hashicorp/consul/command/flags/http.go
index 1fa78e16713..591567a4f0c 100644
--- a/vendor/github.com/hashicorp/consul/command/flags/http.go
+++ b/vendor/github.com/hashicorp/consul/command/flags/http.go
@@ -87,6 +87,12 @@ func (f *HTTPFlags) Token() string {
func (f *HTTPFlags) APIClient() (*api.Client, error) {
c := api.DefaultConfig()
+ f.MergeOntoConfig(c)
+
+ return api.NewClient(c)
+}
+
+func (f *HTTPFlags) MergeOntoConfig(c *api.Config) {
f.address.Merge(&c.Address)
f.token.Merge(&c.Token)
f.caFile.Merge(&c.TLSConfig.CAFile)
@@ -95,6 +101,4 @@ func (f *HTTPFlags) APIClient() (*api.Client, error) {
f.keyFile.Merge(&c.TLSConfig.KeyFile)
f.tlsServerName.Merge(&c.TLSConfig.Address)
f.datacenter.Merge(&c.Datacenter)
-
- return api.NewClient(c)
}
diff --git a/vendor/github.com/hashicorp/consul/lib/freeport/freeport.go b/vendor/github.com/hashicorp/consul/lib/freeport/freeport.go
index 882998314f5..806449ba4a0 100644
--- a/vendor/github.com/hashicorp/consul/lib/freeport/freeport.go
+++ b/vendor/github.com/hashicorp/consul/lib/freeport/freeport.go
@@ -16,7 +16,7 @@ const (
// blockSize is the size of the allocated port block. ports are given out
// consecutively from that block with roll-over for the lifetime of the
// application/test run.
- blockSize = 500
+ blockSize = 1500
// maxBlocks is the number of available port blocks.
// lowPort + maxBlocks * blockSize must be less than 65535.
diff --git a/vendor/github.com/hashicorp/consul/lib/rtt.go b/vendor/github.com/hashicorp/consul/lib/rtt.go
index a417f533c91..fb9090ab1db 100644
--- a/vendor/github.com/hashicorp/consul/lib/rtt.go
+++ b/vendor/github.com/hashicorp/consul/lib/rtt.go
@@ -36,14 +36,14 @@ func (cs CoordinateSet) Intersect(other CoordinateSet) (*coordinate.Coordinate,
// we are possibly a client. Any node with more than one segment can only
// be a server, which means it should be in all segments.
if len(cs) == 1 {
- for s, _ := range cs {
+ for s := range cs {
segment = s
}
}
// Likewise for the other set.
if len(other) == 1 {
- for s, _ := range other {
+ for s := range other {
segment = s
}
}
diff --git a/vendor/github.com/hashicorp/consul/lib/serf.go b/vendor/github.com/hashicorp/consul/lib/serf.go
new file mode 100644
index 00000000000..ee7fad53002
--- /dev/null
+++ b/vendor/github.com/hashicorp/consul/lib/serf.go
@@ -0,0 +1,20 @@
+package lib
+
+import (
+ "github.com/hashicorp/serf/serf"
+)
+
+// SerfDefaultConfig returns a Consul-flavored Serf default configuration,
+// suitable as a basis for a LAN, WAN, segment, or area.
+func SerfDefaultConfig() *serf.Config {
+ base := serf.DefaultConfig()
+
+ // This effectively disables the annoying queue depth warnings.
+ base.QueueDepthWarning = 1000000
+
+ // This enables dynamic sizing of the message queue depth based on the
+ // cluster size.
+ base.MinQueueDepth = 4096
+
+ return base
+}
diff --git a/vendor/github.com/hashicorp/consul/testutil/retry/retry.go b/vendor/github.com/hashicorp/consul/testutil/retry/retry.go
index cfbdde3c9db..ba3a7c36b8b 100644
--- a/vendor/github.com/hashicorp/consul/testutil/retry/retry.go
+++ b/vendor/github.com/hashicorp/consul/testutil/retry/retry.go
@@ -82,7 +82,7 @@ func decorate(s string) string {
}
func Run(t Failer, f func(r *R)) {
- run(TwoSeconds(), t, f)
+ run(DefaultFailer(), t, f)
}
func RunWith(r Retryer, t Failer, f func(r *R)) {
@@ -133,6 +133,11 @@ func run(r Retryer, t Failer, f func(r *R)) {
}
}
+// DefaultFailer provides default retry.Run() behavior for unit tests.
+func DefaultFailer() *Timer {
+ return &Timer{Timeout: 7 * time.Second, Wait: 25 * time.Millisecond}
+}
+
// TwoSeconds repeats an operation for two seconds and waits 25ms in between.
func TwoSeconds() *Timer {
return &Timer{Timeout: 2 * time.Second, Wait: 25 * time.Millisecond}
diff --git a/vendor/github.com/hashicorp/consul/testutil/server.go b/vendor/github.com/hashicorp/consul/testutil/server.go
index 93b734a671b..06c0fdfd283 100644
--- a/vendor/github.com/hashicorp/consul/testutil/server.go
+++ b/vendor/github.com/hashicorp/consul/testutil/server.go
@@ -27,7 +27,7 @@ import (
"testing"
"time"
- "github.com/hashicorp/consul/test/porter"
+ "github.com/hashicorp/consul/lib/freeport"
"github.com/hashicorp/consul/testutil/retry"
"github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-uuid"
@@ -111,17 +111,7 @@ func defaultServerConfig() *TestServerConfig {
panic(err)
}
- ports, err := porter.RandomPorts(6)
- if err != nil {
- if _, ok := err.(*porter.PorterExistErr); ok {
- // Fall back in the case that the testutil server is being used
- // without porter. This should NEVER be used for Consul's own
- // unit tests. See comments for getRandomPorts() for more details.
- ports = getRandomPorts(6)
- } else {
- panic(err)
- }
- }
+ ports := freeport.Get(6)
return &TestServerConfig{
NodeName: "node-" + nodeID,
NodeID: nodeID,
@@ -324,7 +314,7 @@ func (s *TestServer) waitForAPI() error {
}
defer resp.Body.Close()
if err := s.requireOK(resp); err != nil {
- r.Fatal("failed OK respose", err)
+ r.Fatal("failed OK response", err)
}
})
if f.failed {
@@ -390,22 +380,3 @@ func (s *TestServer) waitForLeader() error {
}
return nil
}
-
-// getRandomPorts returns a set of random port or panics on error. This
-// is here to support external uses of testutil which may not have porter,
-// but this has been shown not to work well with parallel tests (such as
-// Consul's unit tests). This fallback should NEVER be used for Consul's
-// own tests.
-func getRandomPorts(n int) []int {
- ports := make([]int, n)
- for i := 0; i < n; i++ {
- l, err := net.Listen("tcp", ":0")
- if err != nil {
- panic(err)
- }
- l.Close()
- ports[i] = l.Addr().(*net.TCPAddr).Port
- }
-
- return ports
-}
diff --git a/vendor/github.com/hashicorp/serf/coordinate/client.go b/vendor/github.com/hashicorp/serf/coordinate/client.go
index 63f6241411b..3582ee4dae2 100644
--- a/vendor/github.com/hashicorp/serf/coordinate/client.go
+++ b/vendor/github.com/hashicorp/serf/coordinate/client.go
@@ -6,6 +6,8 @@ import (
"sort"
"sync"
"time"
+
+ "github.com/armon/go-metrics"
)
// Client manages the estimated network coordinate for a given node, and adjusts
@@ -205,6 +207,20 @@ func (c *Client) Update(node string, other *Coordinate, rtt time.Duration) (*Coo
return nil, err
}
+ // The code down below can handle zero RTTs, which we have seen in
+ // https://github.com/hashicorp/consul/issues/3789, presumably in
+ // environments with coarse-grained monotonic clocks (we are still
+ // trying to pin this down). In any event, this is ok from a code PoV
+ // so we don't need to alert operators with spammy messages. We did
+ // add a counter so this is still observable, though.
+ const maxRTT = 10 * time.Second
+ if rtt < 0 || rtt > maxRTT {
+ return nil, fmt.Errorf("round trip time not in valid range, duration %v is not a positive value less than %v ", rtt, maxRTT)
+ }
+ if rtt == 0 {
+ metrics.IncrCounter([]string{"serf", "coordinate", "zero-rtt"}, 1)
+ }
+
rttSeconds := c.latencyFilter(node, rtt.Seconds())
c.updateVivaldi(other, rttSeconds)
c.updateAdjustment(other, rttSeconds)
diff --git a/vendor/github.com/hashicorp/serf/serf/config.go b/vendor/github.com/hashicorp/serf/serf/config.go
index 74f21ffbdf7..ad4f51b18a7 100644
--- a/vendor/github.com/hashicorp/serf/serf/config.go
+++ b/vendor/github.com/hashicorp/serf/serf/config.go
@@ -112,6 +112,10 @@ type Config struct {
// node.
FlapTimeout time.Duration
+ // QueueCheckInterval is the interval at which we check the message
+ // queue to apply the warning and max depth.
+ QueueCheckInterval time.Duration
+
// QueueDepthWarning is used to generate warning message if the
// number of queued messages to broadcast exceeds this number. This
// is to provide the user feedback if events are being triggered
@@ -123,6 +127,12 @@ type Config struct {
// prevent an unbounded growth of memory utilization
MaxQueueDepth int
+ // MinQueueDepth, if >0 will enforce a lower limit for dropping messages
+ // and then the max will be max(MinQueueDepth, 2*SizeOfCluster). This
+ // defaults to 0 which disables this dynamic sizing feature. If this is
+ // >0 then MaxQueueDepth will be ignored.
+ MinQueueDepth int
+
// RecentIntentTimeout is used to determine how long we store recent
// join and leave intents. This is used to guard against the case where
// Serf broadcasts an intent that arrives before the Memberlist event.
@@ -253,6 +263,7 @@ func DefaultConfig() *Config {
RecentIntentTimeout: 5 * time.Minute,
ReconnectInterval: 30 * time.Second,
ReconnectTimeout: 24 * time.Hour,
+ QueueCheckInterval: 30 * time.Second,
QueueDepthWarning: 128,
MaxQueueDepth: 4096,
TombstoneTimeout: 24 * time.Hour,
diff --git a/vendor/github.com/hashicorp/serf/serf/ping_delegate.go b/vendor/github.com/hashicorp/serf/serf/ping_delegate.go
index a4c44028eda..98032c5bea1 100644
--- a/vendor/github.com/hashicorp/serf/serf/ping_delegate.go
+++ b/vendor/github.com/hashicorp/serf/serf/ping_delegate.go
@@ -68,7 +68,8 @@ func (p *pingDelegate) NotifyPingComplete(other *memberlist.Node, rtt time.Durat
before := p.serf.coordClient.GetCoordinate()
after, err := p.serf.coordClient.Update(other.Name, &coord, rtt)
if err != nil {
- p.serf.logger.Printf("[ERR] serf: Rejected coordinate from %s: %v\n",
+ metrics.IncrCounter([]string{"serf", "coordinate", "rejected"}, 1)
+ p.serf.logger.Printf("[TRACE] serf: Rejected coordinate from %s: %v\n",
other.Name, err)
return
}
diff --git a/vendor/github.com/hashicorp/serf/serf/serf.go b/vendor/github.com/hashicorp/serf/serf/serf.go
index 7256eafab1c..3e89fa11f8b 100644
--- a/vendor/github.com/hashicorp/serf/serf/serf.go
+++ b/vendor/github.com/hashicorp/serf/serf/serf.go
@@ -314,7 +314,6 @@ func Create(conf *Config) (*Serf, error) {
conf.RejoinAfterLeave,
serf.logger,
&serf.clock,
- serf.coordClient,
conf.EventCh,
serf.shutdownCh)
if err != nil {
@@ -1516,21 +1515,37 @@ func (s *Serf) reconnect() {
s.memberlist.Join([]string{addr.String()})
}
+// getQueueMax will get the maximum queue depth, which might be dynamic depending
+// on how Serf is configured.
+func (s *Serf) getQueueMax() int {
+ max := s.config.MaxQueueDepth
+ if s.config.MinQueueDepth > 0 {
+ s.memberLock.RLock()
+ max = 2 * len(s.members)
+ s.memberLock.RUnlock()
+
+ if max < s.config.MinQueueDepth {
+ max = s.config.MinQueueDepth
+ }
+ }
+ return max
+}
+
// checkQueueDepth periodically checks the size of a queue to see if
// it is too large
func (s *Serf) checkQueueDepth(name string, queue *memberlist.TransmitLimitedQueue) {
for {
select {
- case <-time.After(time.Second):
+ case <-time.After(s.config.QueueCheckInterval):
numq := queue.NumQueued()
metrics.AddSample([]string{"serf", "queue", name}, float32(numq))
if numq >= s.config.QueueDepthWarning {
s.logger.Printf("[WARN] serf: %s queue depth: %d", name, numq)
}
- if numq > s.config.MaxQueueDepth {
+ if max := s.getQueueMax(); numq > max {
s.logger.Printf("[WARN] serf: %s queue depth (%d) exceeds limit (%d), dropping messages!",
- name, numq, s.config.MaxQueueDepth)
- queue.Prune(s.config.MaxQueueDepth)
+ name, numq, max)
+ queue.Prune(max)
}
case <-s.shutdownCh:
return
@@ -1654,11 +1669,18 @@ func (s *Serf) Stats() map[string]string {
toString := func(v uint64) string {
return strconv.FormatUint(v, 10)
}
+ s.memberLock.RLock()
+ members := toString(uint64(len(s.members)))
+ failed := toString(uint64(len(s.failedMembers)))
+ left := toString(uint64(len(s.leftMembers)))
+ health_score := toString(uint64(s.memberlist.GetHealthScore()))
+
+ s.memberLock.RUnlock()
stats := map[string]string{
- "members": toString(uint64(len(s.members))),
- "failed": toString(uint64(len(s.failedMembers))),
- "left": toString(uint64(len(s.leftMembers))),
- "health_score": toString(uint64(s.memberlist.GetHealthScore())),
+ "members": members,
+ "failed": failed,
+ "left": left,
+ "health_score": health_score,
"member_time": toString(uint64(s.clock.Time())),
"event_time": toString(uint64(s.eventClock.Time())),
"query_time": toString(uint64(s.queryClock.Time())),
diff --git a/vendor/github.com/hashicorp/serf/serf/snapshot.go b/vendor/github.com/hashicorp/serf/serf/snapshot.go
index 8e15f3f31d0..9f5adebe625 100644
--- a/vendor/github.com/hashicorp/serf/serf/snapshot.go
+++ b/vendor/github.com/hashicorp/serf/serf/snapshot.go
@@ -2,7 +2,6 @@ package serf
import (
"bufio"
- "encoding/json"
"fmt"
"log"
"math/rand"
@@ -13,7 +12,6 @@ import (
"time"
"github.com/armon/go-metrics"
- "github.com/hashicorp/serf/coordinate"
)
/*
@@ -29,34 +27,32 @@ old events.
const flushInterval = 500 * time.Millisecond
const clockUpdateInterval = 500 * time.Millisecond
-const coordinateUpdateInterval = 60 * time.Second
const tmpExt = ".compact"
const snapshotErrorRecoveryInterval = 30 * time.Second
// Snapshotter is responsible for ingesting events and persisting
// them to disk, and providing a recovery mechanism at start time.
type Snapshotter struct {
- aliveNodes map[string]string
- clock *LamportClock
- coordClient *coordinate.Client
- fh *os.File
- buffered *bufio.Writer
- inCh <-chan Event
- lastFlush time.Time
- lastClock LamportTime
- lastEventClock LamportTime
- lastQueryClock LamportTime
- leaveCh chan struct{}
- leaving bool
- logger *log.Logger
- maxSize int64
- path string
- offset int64
- outCh chan<- Event
- rejoinAfterLeave bool
- shutdownCh <-chan struct{}
- waitCh chan struct{}
- lastAttemptedCompaction time.Time
+ aliveNodes map[string]string
+ clock *LamportClock
+ fh *os.File
+ buffered *bufio.Writer
+ inCh <-chan Event
+ lastFlush time.Time
+ lastClock LamportTime
+ lastEventClock LamportTime
+ lastQueryClock LamportTime
+ leaveCh chan struct{}
+ leaving bool
+ logger *log.Logger
+ maxSize int64
+ path string
+ offset int64
+ outCh chan<- Event
+ rejoinAfterLeave bool
+ shutdownCh <-chan struct{}
+ waitCh chan struct{}
+ lastAttemptedCompaction time.Time
}
// PreviousNode is used to represent the previously known alive nodes
@@ -80,7 +76,6 @@ func NewSnapshotter(path string,
rejoinAfterLeave bool,
logger *log.Logger,
clock *LamportClock,
- coordClient *coordinate.Client,
outCh chan<- Event,
shutdownCh <-chan struct{}) (chan<- Event, *Snapshotter, error) {
inCh := make(chan Event, 1024)
@@ -103,7 +98,6 @@ func NewSnapshotter(path string,
snap := &Snapshotter{
aliveNodes: make(map[string]string),
clock: clock,
- coordClient: coordClient,
fh: fh,
buffered: bufio.NewWriter(fh),
inCh: inCh,
@@ -182,9 +176,6 @@ func (s *Snapshotter) stream() {
clockTicker := time.NewTicker(clockUpdateInterval)
defer clockTicker.Stop()
- coordinateTicker := time.NewTicker(coordinateUpdateInterval)
- defer coordinateTicker.Stop()
-
for {
select {
case <-s.leaveCh:
@@ -226,9 +217,6 @@ func (s *Snapshotter) stream() {
case <-clockTicker.C:
s.updateClock()
- case <-coordinateTicker.C:
- s.updateCoordinate()
-
case <-s.shutdownCh:
if err := s.buffered.Flush(); err != nil {
s.logger.Printf("[ERR] serf: failed to flush snapshot: %v", err)
@@ -275,20 +263,6 @@ func (s *Snapshotter) updateClock() {
}
}
-// updateCoordinate is called periodically to write out the current local
-// coordinate. It's safe to call this if coordinates aren't enabled (nil
-// client) and it will be a no-op.
-func (s *Snapshotter) updateCoordinate() {
- if s.coordClient != nil {
- encoded, err := json.Marshal(s.coordClient.GetCoordinate())
- if err != nil {
- s.logger.Printf("[ERR] serf: Failed to encode coordinate: %v", err)
- } else {
- s.tryAppend(fmt.Sprintf("coordinate: %s\n", encoded))
- }
- }
-}
-
// processUserEvent is used to handle a single user event
func (s *Snapshotter) processUserEvent(e UserEvent) {
// Ignore old clocks
@@ -404,30 +378,22 @@ func (s *Snapshotter) compact() error {
}
offset += int64(n)
- // Write out the coordinate.
- if s.coordClient != nil {
- encoded, err := json.Marshal(s.coordClient.GetCoordinate())
- if err != nil {
- fh.Close()
- return err
- }
-
- line = fmt.Sprintf("coordinate: %s\n", encoded)
- n, err = buf.WriteString(line)
- if err != nil {
- fh.Close()
- return err
- }
- offset += int64(n)
- }
-
// Flush the new snapshot
err = buf.Flush()
- fh.Close()
+
if err != nil {
return fmt.Errorf("failed to flush new snapshot: %v", err)
}
+ err = fh.Sync()
+
+ if err != nil {
+ fh.Close()
+ return fmt.Errorf("failed to fsync new snapshot: %v", err)
+ }
+
+ fh.Close()
+
// We now need to swap the old snapshot file with the new snapshot.
// Turns out, Windows won't let us rename the files if we have
// open handles to them or if the destination already exists. This
@@ -533,22 +499,7 @@ func (s *Snapshotter) replay() error {
s.lastQueryClock = LamportTime(timeInt)
} else if strings.HasPrefix(line, "coordinate: ") {
- if s.coordClient == nil {
- s.logger.Printf("[WARN] serf: Ignoring snapshot coordinates since they are disabled")
- continue
- }
-
- coordStr := strings.TrimPrefix(line, "coordinate: ")
- var coord coordinate.Coordinate
- err := json.Unmarshal([]byte(coordStr), &coord)
- if err != nil {
- s.logger.Printf("[WARN] serf: Failed to decode coordinate: %v", err)
- continue
- }
- if err := s.coordClient.SetCoordinate(&coord); err != nil {
- s.logger.Printf("[WARN] serf: Failed to set coordinate: %v", err)
- continue
- }
+ continue // Ignores any coordinate persistence from old snapshots, serf should re-converge
} else if line == "leave" {
// Ignore a leave if we plan on re-joining
if s.rejoinAfterLeave {
diff --git a/vendor/vendor.json b/vendor/vendor.json
index 112d5d49ce2..c543ecfdcb3 100644
--- a/vendor/vendor.json
+++ b/vendor/vendor.json
@@ -124,14 +124,14 @@
{"path":"github.com/hashicorp/consul-template/template","checksumSHA1":"N9qobVzScLbTEnGE7MgFnnTbGBw=","revision":"26d029ad37335b3827a9fde5569b2c5e10dcac8f","revisionTime":"2017-10-31T14:25:17Z"},
{"path":"github.com/hashicorp/consul-template/version","checksumSHA1":"NB5+D4AuCNV9Bsqh3YFdPi4AJ6U=","revision":"26d029ad37335b3827a9fde5569b2c5e10dcac8f","revisionTime":"2017-10-31T14:25:17Z"},
{"path":"github.com/hashicorp/consul-template/watch","checksumSHA1":"b4+Y+02pY2Y5620F9ALzKg8Zmdw=","revision":"26d029ad37335b3827a9fde5569b2c5e10dcac8f","revisionTime":"2017-10-31T14:25:17Z"},
- {"path":"github.com/hashicorp/consul/agent/consul/autopilot","checksumSHA1":"+I7fgoQlrnTUGW5krqNLadWwtjg=","revision":"d1ede2c93dec7b4580e37ef41d24371abab9d9e9","revisionTime":"2018-02-21T18:19:48Z"},
- {"path":"github.com/hashicorp/consul/api","checksumSHA1":"XLfcIX2qpRr0o26aFMjCOzvw6jo=","revision":"51ea240df8476e02215d53fbfad5838bf0d44d21","revisionTime":"2017-10-16T16:22:40Z"},
- {"path":"github.com/hashicorp/consul/command/flags","checksumSHA1":"XTQIYV+DPUVRKpVp0+y/78bWH3I=","revision":"d08ab9fd199434e5220276356ecf9617cfec1eb2","revisionTime":"2017-12-18T20:26:35Z"},
- {"path":"github.com/hashicorp/consul/lib","checksumSHA1":"HGljdtVaqi/e3DgIHymLRLfPYhw=","revision":"bcafded4e60982d0b71e730f0b8564d73cb1d715","revisionTime":"2017-10-31T16:39:15Z"},
- {"path":"github.com/hashicorp/consul/lib/freeport","checksumSHA1":"hDJiPli3EEGJE4vAezMi05oOC7o=","revision":"bcafded4e60982d0b71e730f0b8564d73cb1d715","revisionTime":"2017-10-31T16:39:15Z"},
+ {"path":"github.com/hashicorp/consul/agent/consul/autopilot","checksumSHA1":"+I7fgoQlrnTUGW5krqNLadWwtjg=","revision":"fb848fc48818f58690db09d14640513aa6bf3c02","revisionTime":"2018-04-13T17:05:42Z"},
+ {"path":"github.com/hashicorp/consul/api","checksumSHA1":"7UvyPiYTxcB8xqRlULAT3X8+8zE=","revision":"fb848fc48818f58690db09d14640513aa6bf3c02","revisionTime":"2018-04-13T17:05:42Z"},
+ {"path":"github.com/hashicorp/consul/command/flags","checksumSHA1":"soNN4xaHTbeXFgNkZ7cX0gbFXQk=","revision":"fb848fc48818f58690db09d14640513aa6bf3c02","revisionTime":"2018-04-13T17:05:42Z"},
+ {"path":"github.com/hashicorp/consul/lib","checksumSHA1":"Nrh9BhiivRyJiuPzttstmq9xl/w=","revision":"fb848fc48818f58690db09d14640513aa6bf3c02","revisionTime":"2018-04-13T17:05:42Z"},
+ {"path":"github.com/hashicorp/consul/lib/freeport","checksumSHA1":"E28E4zR1FN2v1Xiq4FUER7KVN9M=","revision":"fb848fc48818f58690db09d14640513aa6bf3c02","revisionTime":"2018-04-13T17:05:42Z"},
{"path":"github.com/hashicorp/consul/test/porter","checksumSHA1":"5XjgqE4UIfwXvkq5VssGNc7uPhQ=","revision":"ad9425ca6353b8afcfebd19130a8cf768f7eac30","revisionTime":"2017-10-21T00:05:25Z"},
- {"path":"github.com/hashicorp/consul/testutil","checksumSHA1":"+go9ycmyfF4b0W174gc7ej5mnE8=","revision":"350932161d6745836c1b2f39849bddb0f9fb52fd","revisionTime":"2017-10-20T23:49:17Z"},
- {"path":"github.com/hashicorp/consul/testutil/retry","checksumSHA1":"J8TTDc84MvAyXE/FrfgS+xc/b6s=","revision":"350932161d6745836c1b2f39849bddb0f9fb52fd","revisionTime":"2017-10-20T23:49:17Z"},
+ {"path":"github.com/hashicorp/consul/testutil","checksumSHA1":"T4CeQD+QRsjf1BJ1n7FSojS5zDQ=","revision":"fb848fc48818f58690db09d14640513aa6bf3c02","revisionTime":"2018-04-13T17:05:42Z"},
+ {"path":"github.com/hashicorp/consul/testutil/retry","checksumSHA1":"SCb2b91UYiB/23+SNDBlU5OZfFA=","revision":"fb848fc48818f58690db09d14640513aa6bf3c02","revisionTime":"2018-04-13T17:05:42Z"},
{"path":"github.com/hashicorp/errwrap","revision":"7554cd9344cec97297fa6649b055a8c98c2a1e55"},
{"path":"github.com/hashicorp/go-checkpoint","checksumSHA1":"D267IUMW2rcb+vNe3QU+xhfSrgY=","revision":"1545e56e46dec3bba264e41fde2c1e2aa65b5dd4","revisionTime":"2017-10-09T17:35:28Z"},
{"path":"github.com/hashicorp/go-cleanhttp","checksumSHA1":"6ihdHMkDfFx/rJ1A36com2F6bQk=","revision":"a45970658e51fea2c41445ff0f7e07106d007617","revisionTime":"2017-02-11T00:33:01Z"},
@@ -168,8 +168,8 @@
{"path":"github.com/hashicorp/net-rpc-msgpackrpc","revision":"a14192a58a694c123d8fe5481d4a4727d6ae82f3"},
{"path":"github.com/hashicorp/raft","checksumSHA1":"zkA9uvbj1BdlveyqXpVTh1N6ers=","revision":"077966dbc90f342107eb723ec52fdb0463ec789b","revisionTime":"2018-01-17T20:29:25Z","version":"master","versionExact":"master"},
{"path":"github.com/hashicorp/raft-boltdb","checksumSHA1":"QAxukkv54/iIvLfsUP6IK4R0m/A=","revision":"d1e82c1ec3f15ee991f7cc7ffd5b67ff6f5bbaee","revisionTime":"2015-02-01T20:08:39Z"},
- {"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"/oss17GO4hXGM7QnUdI3VzcAHzA=","revision":"bbeddf0b3ab3072a60525afbd6b6f47d33839eee","revisionTime":"2017-07-14T18:26:01Z"},
- {"path":"github.com/hashicorp/serf/serf","checksumSHA1":"pvLOzocYsZtxuJ9pREHRTxYnoa4=","revision":"bbeddf0b3ab3072a60525afbd6b6f47d33839eee","revisionTime":"2017-07-14T18:26:01Z"},
+ {"path":"github.com/hashicorp/serf/coordinate","checksumSHA1":"0PeWsO2aI+2PgVYlYlDPKfzCLEQ=","revision":"fc4bdedf2366c64984e280c6eefc703ca7812585","revisionTime":"2018-04-11T17:01:37Z"},
+ {"path":"github.com/hashicorp/serf/serf","checksumSHA1":"YzJaaeIJpxLfVDZYT1X2hpd8IK8=","revision":"fc4bdedf2366c64984e280c6eefc703ca7812585","revisionTime":"2018-04-11T17:01:37Z"},
{"path":"github.com/hashicorp/vault","checksumSHA1":"eGzvBRMFD6ZB3A6uO750np7Om/E=","revision":"182ba68a9589d4cef95234134aaa498a686e3de3","revisionTime":"2016-08-21T23:40:57Z"},
{"path":"github.com/hashicorp/vault/api","checksumSHA1":"mKN4rEIWyflT6aqJyjgu9m1tPXI=","revision":"3ddd3bd20cec0588788547aecd15e91461b9d546","revisionTime":"2018-04-03T21:11:47Z"},
{"path":"github.com/hashicorp/vault/helper/compressutil","checksumSHA1":"jHVLe8KMdEpb/ZALp0zu+tenADo=","revision":"3ddd3bd20cec0588788547aecd15e91461b9d546","revisionTime":"2018-04-03T21:11:47Z"},
diff --git a/website/source/docs/job-specification/service.html.md b/website/source/docs/job-specification/service.html.md
index 2c465ca48b5..7ebbe4c5b87 100644
--- a/website/source/docs/job-specification/service.html.md
+++ b/website/source/docs/job-specification/service.html.md
@@ -72,7 +72,7 @@ does not automatically enable service discovery.
- `check` ([Check](#check-parameters): nil)
- Specifies a health
check associated with the service. This can be specified multiple times to
define multiple checks for the service. At this time, Nomad supports the
- `script`1, `http` and `tcp` checks.
+ `grpc`, `http`, `script`1, and `tcp` checks.
- `name` `(string: "--")` - Specifies the name this service
will be advertised as in Consul. If not supplied, this will default to the
@@ -160,6 +160,12 @@ scripts.
parameter. To achieve the behavior of shell operators, specify the command
as a shell, like `/bin/bash` and then use `args` to run the check.
+- `grpc_service` `(string: )` - What service, if any, to specify in
+ the gRPC health check. gRPC health checks require Consul 1.0.5 or later.
+
+- `grpc_use_tls` `(bool: false)` - Use TLS to perform a gRPC health check. May
+ be used with `tls_skip_verify` to use TLS but skip certificate verification.
+
- `initial_status` `(string: )` - Specifies the originating status of the
service. Valid options are the empty string, `passing`, `warning`, and
`critical`.
@@ -186,9 +192,9 @@ scripts.
in the [`network`][network] stanza. If a port value was declared on the
`service`, this will inherit from that value if not supplied. If supplied,
this value takes precedence over the `service.port` value. This is useful for
- services which operate on multiple ports. `http` and `tcp` checks require a
- port while `script` checks do not. Checks will use the host IP and ports by
- default. In Nomad 0.7.1 or later numeric ports may be used if
+ services which operate on multiple ports. `grpc`, `http`, and `tcp` checks
+ require a port while `script` checks do not. Checks will use the host IP and
+ ports by default. In Nomad 0.7.1 or later numeric ports may be used if
`address_mode="driver"` is set on the check.
- `protocol` `(string: "http")` - Specifies the protocol for the http-based
@@ -199,7 +205,8 @@ scripts.
"30s" or "1h". This must be greater than or equal to "1s"
- `type` `(string: )` - This indicates the check types supported by
- Nomad. Valid options are `script`, `http`, and `tcp`.
+ Nomad. Valid options are `grpc`, `http`, `script`, and `tcp`. gRPC health
+ checks require Consul 1.0.5 or later.
- `tls_skip_verify` `(bool: false)` - Skip verifying TLS certificates for HTTPS
checks. Requires Consul >= 0.7.2.
@@ -355,6 +362,32 @@ service {
}
```
+### gRPC Health Check
+
+gRPC health checks use the same host and port behavior as `http` and `tcp`
+checks, but gRPC checks also have an optional gRPC service to health check. Not
+all gRPC applications require a service to health check. gRPC health checks
+require Consul 1.0.5 or later.
+
+```hcl
+service {
+ check {
+ type = "grpc"
+ port = "rpc"
+ interval = "5s"
+ timeout = "2s"
+ grpc_service = "example.Service"
+ grpc_use_tls = true
+ tls_skip_verify = true
+ }
+}
+```
+
+In this example Consul would health check the `example.Service` service on the
+`rpc` port defined in the task's [network resources][network] stanza. See
+[Using Driver Address Mode](#using-driver-address-mode) for details on address
+selection.
+
### Using Driver Address Mode
The [Docker](/docs/drivers/docker.html#network_mode) and
@@ -582,6 +615,7 @@ advertise and check directly since Nomad isn't managing any port assignments.
system of a task for that driver.
[check_restart_stanza]: /docs/job-specification/check_restart.html "check_restart stanza"
+[consul_grpc]: https://www.consul.io/api/agent/check.html#grpc
[service-discovery]: /docs/service-discovery/index.html "Nomad Service Discovery"
[interpolation]: /docs/runtime/interpolation.html "Nomad Runtime Interpolation"
[network]: /docs/job-specification/network.html "Nomad network Job Specification"