diff --git a/command/commands.go b/command/commands.go index 745ae8851d41..5649edbe0c73 100644 --- a/command/commands.go +++ b/command/commands.go @@ -62,6 +62,9 @@ import ( physZooKeeper "github.com/hashicorp/vault/physical/zookeeper" physFile "github.com/hashicorp/vault/sdk/physical/file" physInmem "github.com/hashicorp/vault/sdk/physical/inmem" + + sr "github.com/hashicorp/vault/serviceregistration" + csr "github.com/hashicorp/vault/serviceregistration/consul" ) const ( @@ -155,6 +158,10 @@ var ( "raft": physRaft.NewRaftBackend, "zookeeper": physZooKeeper.NewZooKeeperBackend, } + + serviceRegistrations = map[string]sr.Factory{ + "consul": csr.NewConsulServiceRegistration, + } ) // Commands is the mapping of all the available commands. @@ -517,9 +524,12 @@ func initCommands(ui, serverCmdUi cli.Ui, runOpts *RunOptions) { CredentialBackends: credentialBackends, LogicalBackends: logicalBackends, PhysicalBackends: physicalBackends, - ShutdownCh: MakeShutdownCh(), - SighupCh: MakeSighupCh(), - SigUSR2Ch: MakeSigUSR2Ch(), + + ServiceRegistrations: serviceRegistrations, + + ShutdownCh: MakeShutdownCh(), + SighupCh: MakeSighupCh(), + SigUSR2Ch: MakeSigUSR2Ch(), }, nil }, "ssh": func() (cli.Command, error) { diff --git a/command/server.go b/command/server.go index 10f93b50758b..8ee542359423 100644 --- a/command/server.go +++ b/command/server.go @@ -47,6 +47,7 @@ import ( "github.com/hashicorp/vault/sdk/logical" "github.com/hashicorp/vault/sdk/physical" "github.com/hashicorp/vault/sdk/version" + sr "github.com/hashicorp/vault/serviceregistration" "github.com/hashicorp/vault/vault" vaultseal "github.com/hashicorp/vault/vault/seal" shamirseal "github.com/hashicorp/vault/vault/seal/shamir" @@ -79,6 +80,8 @@ type ServerCommand struct { LogicalBackends map[string]logical.Factory PhysicalBackends map[string]physical.Factory + ServiceRegistrations map[string]sr.Factory + ShutdownCh chan struct{} SighupCh chan struct{} SigUSR2Ch chan struct{} @@ -935,6 +938,24 @@ func (c *ServerCommand) Run(args []string) int { return 1 } + // Initialize the Service Discovery, if there is one + var configSR sr.ServiceRegistration + if config.ServiceRegistration != nil { + sdFactory, ok := c.ServiceRegistrations[config.ServiceRegistration.Type] + if !ok { + c.UI.Error(fmt.Sprintf("Unknown service_registration type %s", config.ServiceRegistration.Type)) + return 1 + } + + namedSDLogger := c.logger.Named("service_registration." + config.ServiceRegistration.Type) + allLoggers = append(allLoggers, namedSDLogger) + configSR, err = sdFactory(config.ServiceRegistration.Config, namedSDLogger) + if err != nil { + c.UI.Error(fmt.Sprintf("Error initializing service_registration of type %s: %s", config.ServiceRegistration.Type, err)) + return 1 + } + } + infoKeys := make([]string, 0, 10) info := make(map[string]string) info["log level"] = logLevelString @@ -1019,6 +1040,7 @@ func (c *ServerCommand) Run(args []string) int { RedirectAddr: config.Storage.RedirectAddr, StorageType: config.Storage.Type, HAPhysical: nil, + ServiceRegistration: configSR, Seal: barrierSeal, AuditBackends: c.AuditBackends, CredentialBackends: c.CredentialBackends, @@ -1218,6 +1240,13 @@ CLUSTER_SYNTHESIS_COMPLETE: } } + // If ServiceRegistration is configured, then the backend must support HA + isBackendHA := coreConfig.HAPhysical != nil && coreConfig.HAPhysical.HAEnabled() + if (coreConfig.ServiceRegistration != nil) && !isBackendHA { + c.UI.Output("service_registration is configured, but storage does not support HA") + return 1 + } + // Initialize the core core, newCoreError := vault.NewCore(coreConfig) if newCoreError != nil { @@ -1473,21 +1502,17 @@ CLUSTER_SYNTHESIS_COMPLETE: // Instantiate the wait group c.WaitGroup = &sync.WaitGroup{} - // If the backend supports service discovery, run service discovery - if coreConfig.HAPhysical != nil && coreConfig.HAPhysical.HAEnabled() { - sd, ok := coreConfig.HAPhysical.(physical.ServiceDiscovery) - if ok { - activeFunc := func() bool { - if isLeader, _, _, err := core.Leader(); err == nil { - return isLeader - } - return false - } - - if err := sd.RunServiceDiscovery(c.WaitGroup, c.ShutdownCh, coreConfig.RedirectAddr, activeFunc, core.Sealed, core.PerfStandby); err != nil { - c.UI.Error(fmt.Sprintf("Error initializing service discovery: %v", err)) - return 1 + // If service discovery is available, run service discovery + if disc := coreConfig.GetServiceRegistration(); disc != nil { + activeFunc := func() bool { + if isLeader, _, _, err := core.Leader(); err == nil { + return isLeader } + return false + } + if err := disc.RunServiceRegistration(c.WaitGroup, c.ShutdownCh, coreConfig.RedirectAddr, activeFunc, core.Sealed, core.PerfStandby); err != nil { + c.UI.Error(fmt.Sprintf("Error initializing service discovery: %v", err)) + return 1 } } diff --git a/command/server/config.go b/command/server/config.go index 41f9150f2bce..e78656aec1e6 100644 --- a/command/server/config.go +++ b/command/server/config.go @@ -28,6 +28,8 @@ type Config struct { Storage *Storage `hcl:"-"` HAStorage *Storage `hcl:"-"` + ServiceRegistration *ServiceRegistration `hcl:"-"` + Seals []*Seal `hcl:"-"` Entropy *Entropy `hcl:"-"` @@ -150,6 +152,16 @@ func (b *Storage) GoString() string { return fmt.Sprintf("*%#v", *b) } +// ServiceRegistration is the optional service discovery for the server. +type ServiceRegistration struct { + Type string + Config map[string]string +} + +func (b *ServiceRegistration) GoString() string { + return fmt.Sprintf("*%#v", *b) +} + // Seal contains Seal configuration for the server type Seal struct { Type string @@ -292,6 +304,11 @@ func (c *Config) Merge(c2 *Config) *Config { result.HAStorage = c2.HAStorage } + result.ServiceRegistration = c.ServiceRegistration + if c2.ServiceRegistration != nil { + result.ServiceRegistration = c2.ServiceRegistration + } + result.Entropy = c.Entropy if c2.Entropy != nil { result.Entropy = c2.Entropy @@ -585,6 +602,13 @@ func ParseConfig(d string) (*Config, error) { } } + // Parse service discovery + if o := list.Filter("service_registration"); len(o.Items) > 0 { + if err := parseServiceRegistration(&result, o, "service_registration"); err != nil { + return nil, errwrap.Wrapf("error parsing 'service_registration': {{err}}", err) + } + } + if o := list.Filter("hsm"); len(o.Items) > 0 { if err := parseSeals(&result, o, "hsm"); err != nil { return nil, errwrap.Wrapf("error parsing 'hsm': {{err}}", err) @@ -829,6 +853,30 @@ func parseHAStorage(result *Config, list *ast.ObjectList, name string) error { return nil } +func parseServiceRegistration(result *Config, list *ast.ObjectList, name string) error { + if len(list.Items) > 1 { + return fmt.Errorf("only one %q block is permitted", name) + } + + // Get our item + item := list.Items[0] + key := name + if len(item.Keys) > 0 { + key = item.Keys[0].Token.Value().(string) + } + + var m map[string]string + if err := hcl.DecodeObject(&m, item.Val); err != nil { + return multierror.Prefix(err, fmt.Sprintf("%s.%s:", name, key)) + } + + result.ServiceRegistration = &ServiceRegistration{ + Type: strings.ToLower(key), + Config: m, + } + return nil +} + func parseSeals(result *Config, list *ast.ObjectList, blockName string) error { if len(list.Items) > 2 { return fmt.Errorf("only two or less %q blocks are permitted", blockName) @@ -1009,6 +1057,14 @@ func (c *Config) Sanitized() map[string]interface{} { result["ha_storage"] = sanitizedHAStorage } + // Sanitize service_registration stanza + if c.ServiceRegistration != nil { + sanitizedServiceRegistration := map[string]interface{}{ + "type": c.ServiceRegistration.Type, + } + result["service_registration"] = sanitizedServiceRegistration + } + // Sanitize seals stanza if len(c.Seals) != 0 { var sanitizedSeals []interface{} diff --git a/command/server/config_test_helpers.go b/command/server/config_test_helpers.go index 12fff634b20d..b639a7e01b84 100644 --- a/command/server/config_test_helpers.go +++ b/command/server/config_test_helpers.go @@ -47,6 +47,13 @@ func testLoadConfigFile_topLevel(t *testing.T, entropy *Entropy) { DisableClustering: true, }, + ServiceRegistration: &ServiceRegistration{ + Type: "consul", + Config: map[string]string{ + "foo": "bar", + }, + }, + Telemetry: &Telemetry{ StatsdAddr: "bar", StatsiteAddr: "foo", @@ -126,6 +133,13 @@ func testLoadConfigFile_json2(t *testing.T, entropy *Entropy) { DisableClustering: true, }, + ServiceRegistration: &ServiceRegistration{ + Type: "consul", + Config: map[string]string{ + "foo": "bar", + }, + }, + CacheSize: 45678, EnableUI: true, @@ -261,6 +275,13 @@ func testLoadConfigFile(t *testing.T) { DisableClustering: true, }, + ServiceRegistration: &ServiceRegistration{ + Type: "consul", + Config: map[string]string{ + "foo": "bar", + }, + }, + Telemetry: &Telemetry{ StatsdAddr: "bar", StatsiteAddr: "foo", @@ -324,6 +345,13 @@ func testLoadConfigFile_json(t *testing.T) { DisableClustering: true, }, + ServiceRegistration: &ServiceRegistration{ + Type: "consul", + Config: map[string]string{ + "foo": "bar", + }, + }, + ClusterCipherSuites: "TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256,TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA", Telemetry: &Telemetry{ @@ -476,6 +504,9 @@ func testConfig_Sanitized(t *testing.T) { "redirect_addr": "top_level_api_addr", "type": "consul", }, + "service_registration": map[string]interface{}{ + "type": "consul", + }, "telemetry": map[string]interface{}{ "circonus_api_app": "", "circonus_api_token": "", diff --git a/command/server/test-fixtures/config.hcl b/command/server/test-fixtures/config.hcl index bd45bcfc26af..e3c1ebe44e99 100644 --- a/command/server/test-fixtures/config.hcl +++ b/command/server/test-fixtures/config.hcl @@ -18,6 +18,10 @@ ha_backend "consul" { disable_clustering = "true" } +service_registration "consul" { + foo = "bar" +} + telemetry { statsd_address = "bar" statsite_address = "foo" diff --git a/command/server/test-fixtures/config.hcl.json b/command/server/test-fixtures/config.hcl.json index 9f81018a0ff4..df79b84af3fb 100644 --- a/command/server/test-fixtures/config.hcl.json +++ b/command/server/test-fixtures/config.hcl.json @@ -11,6 +11,11 @@ "disable_clustering": "true" } }, + "service_registration": { + "consul": { + "foo": "bar", + } + }, "telemetry": { "statsite_address": "baz" }, diff --git a/command/server/test-fixtures/config2.hcl b/command/server/test-fixtures/config2.hcl index 7fd032b480ad..33244e54e8f5 100644 --- a/command/server/test-fixtures/config2.hcl +++ b/command/server/test-fixtures/config2.hcl @@ -21,6 +21,10 @@ ha_storage "consul" { disable_clustering = "true" } +service_registration "consul" { + foo = "bar" +} + telemetry { statsd_address = "bar" statsite_address = "foo" diff --git a/command/server/test-fixtures/config2.hcl.json b/command/server/test-fixtures/config2.hcl.json index dede6b812350..16510ad9de17 100644 --- a/command/server/test-fixtures/config2.hcl.json +++ b/command/server/test-fixtures/config2.hcl.json @@ -25,6 +25,11 @@ "disable_clustering": "true" } }, + "service_registration":{ + "consul":{ + "foo":"bar" + } + }, "cache_size": 45678, "telemetry":{ "statsd_address":"bar", diff --git a/command/server/test-fixtures/config3.hcl b/command/server/test-fixtures/config3.hcl index 48ac9a156433..1358c2b18e81 100644 --- a/command/server/test-fixtures/config3.hcl +++ b/command/server/test-fixtures/config3.hcl @@ -22,6 +22,10 @@ ha_backend "consul" { token = "foo" } +service_registration "consul" { + token = "foo" +} + telemetry { statsd_address = "bar" circonus_api_token = "baz" @@ -38,4 +42,4 @@ default_lease_ttl = "10h" cluster_name = "testcluster" pid_file = "./pidfile" raw_storage_endpoint = true -disable_sealwrap = true \ No newline at end of file +disable_sealwrap = true diff --git a/physical/consul/consul.go b/physical/consul/consul.go index 4db2b3628754..f835e66d76ac 100644 --- a/physical/consul/consul.go +++ b/physical/consul/consul.go @@ -4,58 +4,23 @@ import ( "context" "errors" "fmt" - "io/ioutil" - "net" - "net/http" - "net/url" - "regexp" "strconv" "strings" - "sync" - "sync/atomic" "time" - "golang.org/x/net/http2" - log "github.com/hashicorp/go-hclog" - "crypto/tls" - "crypto/x509" - metrics "github.com/armon/go-metrics" "github.com/hashicorp/consul/api" "github.com/hashicorp/errwrap" multierror "github.com/hashicorp/go-multierror" - "github.com/hashicorp/vault/sdk/helper/consts" "github.com/hashicorp/vault/sdk/helper/parseutil" - "github.com/hashicorp/vault/sdk/helper/strutil" - "github.com/hashicorp/vault/sdk/helper/tlsutil" "github.com/hashicorp/vault/sdk/physical" + sr "github.com/hashicorp/vault/serviceregistration" + csr "github.com/hashicorp/vault/serviceregistration/consul" ) const ( - // checkJitterFactor specifies the jitter factor used to stagger checks - checkJitterFactor = 16 - - // checkMinBuffer specifies provides a guarantee that a check will not - // be executed too close to the TTL check timeout - checkMinBuffer = 100 * time.Millisecond - - // consulRetryInterval specifies the retry duration to use when an - // API call to the Consul agent fails. - consulRetryInterval = 1 * time.Second - - // defaultCheckTimeout changes the timeout of TTL checks - defaultCheckTimeout = 5 * time.Second - - // DefaultServiceName is the default Consul service name used when - // advertising a Vault instance. - DefaultServiceName = "vault" - - // reconcileTimeout is how often Vault should query Consul to detect - // and fix any state drift. - reconcileTimeout = 60 * time.Second - // consistencyModeDefault is the configuration value used to tell // consul to use default consistency. consistencyModeDefault = "default" @@ -65,41 +30,23 @@ const ( consistencyModeStrong = "strong" ) -type notifyEvent struct{} - // Verify ConsulBackend satisfies the correct interfaces var _ physical.Backend = (*ConsulBackend)(nil) var _ physical.HABackend = (*ConsulBackend)(nil) var _ physical.Lock = (*ConsulLock)(nil) var _ physical.Transactional = (*ConsulBackend)(nil) -var _ physical.ServiceDiscovery = (*ConsulBackend)(nil) - -var ( - hostnameRegex = regexp.MustCompile(`^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$`) -) +var _ sr.ServiceRegistration = (*ConsulBackend)(nil) // ConsulBackend is a physical backend that stores data at specific // prefix within Consul. It is used for most production situations as // it allows Vault to run on multiple machines in a highly-available manner. type ConsulBackend struct { - path string - logger log.Logger - client *api.Client - kv *api.KV - permitPool *physical.PermitPool - serviceLock sync.RWMutex - redirectHost string - redirectPort int64 - serviceName string - serviceTags []string - serviceAddress *string - disableRegistration bool - checkTimeout time.Duration - consistencyMode string - - notifyActiveCh chan notifyEvent - notifySealedCh chan notifyEvent - notifyPerfStandbyCh chan notifyEvent + *csr.ConsulServiceRegistration + + path string + kv *api.KV + permitPool *physical.PermitPool + consistencyMode string sessionTTL string lockWaitTime time.Duration @@ -108,6 +55,15 @@ type ConsulBackend struct { // NewConsulBackend constructs a Consul backend using the given API client // and the prefix in the KV store. func NewConsulBackend(conf map[string]string, logger log.Logger) (physical.Backend, error) { + + // Create the ConsulServiceRegistration struct that we will embed in the + // ConsulBackend + sreg, err := csr.NewConsulServiceRegistration(conf, logger) + if err != nil { + return nil, err + } + csreg := sreg.(*csr.ConsulServiceRegistration) + // Get the path in Consul path, ok := conf["path"] if !ok { @@ -127,67 +83,6 @@ func NewConsulBackend(conf map[string]string, logger log.Logger) (physical.Backe path = strings.TrimPrefix(path, "/") } - // Allow admins to disable consul integration - disableReg, ok := conf["disable_registration"] - var disableRegistration bool - if ok && disableReg != "" { - b, err := parseutil.ParseBool(disableReg) - if err != nil { - return nil, errwrap.Wrapf("failed parsing disable_registration parameter: {{err}}", err) - } - disableRegistration = b - } - if logger.IsDebug() { - logger.Debug("config disable_registration set", "disable_registration", disableRegistration) - } - - // Get the service name to advertise in Consul - service, ok := conf["service"] - if !ok { - service = DefaultServiceName - } - if !hostnameRegex.MatchString(service) { - return nil, errors.New("service name must be valid per RFC 1123 and can contain only alphanumeric characters or dashes") - } - if logger.IsDebug() { - logger.Debug("config service set", "service", service) - } - - // Get the additional tags to attach to the registered service name - tags := conf["service_tags"] - if logger.IsDebug() { - logger.Debug("config service_tags set", "service_tags", tags) - } - - // Get the service-specific address to override the use of the HA redirect address - var serviceAddr *string - serviceAddrStr, ok := conf["service_address"] - if ok { - serviceAddr = &serviceAddrStr - } - if logger.IsDebug() { - logger.Debug("config service_address set", "service_address", serviceAddr) - } - - checkTimeout := defaultCheckTimeout - checkTimeoutStr, ok := conf["check_timeout"] - if ok { - d, err := parseutil.ParseDurationSecond(checkTimeoutStr) - if err != nil { - return nil, err - } - - min, _ := DurationMinusBufferDomain(d, checkMinBuffer, checkJitterFactor) - if min < checkMinBuffer { - return nil, fmt.Errorf("consul check_timeout must be greater than %v", min) - } - - checkTimeout = d - if logger.IsDebug() { - logger.Debug("config check_timeout set", "check_timeout", d) - } - } - sessionTTL := api.DefaultLockSessionTTL sessionTTLStr, ok := conf["session_ttl"] if ok { @@ -214,63 +109,6 @@ func NewConsulBackend(conf map[string]string, logger log.Logger) (physical.Backe } } - // Configure the client - consulConf := api.DefaultConfig() - // Set MaxIdleConnsPerHost to the number of processes used in expiration.Restore - consulConf.Transport.MaxIdleConnsPerHost = consts.ExpirationRestoreWorkerCount - - if addr, ok := conf["address"]; ok { - consulConf.Address = addr - if logger.IsDebug() { - logger.Debug("config address set", "address", addr) - } - - // Copied from the Consul API module; set the Scheme based on - // the protocol field if address looks ike a URL. - // This can enable the TLS configuration below. - parts := strings.SplitN(addr, "://", 2) - if len(parts) == 2 { - if parts[0] == "http" || parts[0] == "https" { - consulConf.Scheme = parts[0] - consulConf.Address = parts[1] - if logger.IsDebug() { - logger.Debug("config address parsed", "scheme", parts[0]) - logger.Debug("config scheme parsed", "address", parts[1]) - } - } // allow "unix:" or whatever else consul supports in the future - } - } - if scheme, ok := conf["scheme"]; ok { - consulConf.Scheme = scheme - if logger.IsDebug() { - logger.Debug("config scheme set", "scheme", scheme) - } - } - if token, ok := conf["token"]; ok { - consulConf.Token = token - logger.Debug("config token set") - } - - if consulConf.Scheme == "https" { - // Use the parsed Address instead of the raw conf['address'] - tlsClientConfig, err := setupTLSConfig(conf, consulConf.Address) - if err != nil { - return nil, err - } - - consulConf.Transport.TLSClientConfig = tlsClientConfig - if err := http2.ConfigureTransport(consulConf.Transport); err != nil { - return nil, err - } - logger.Debug("configured TLS") - } - - consulConf.HttpClient = &http.Client{Transport: consulConf.Transport} - client, err := api.NewClient(consulConf) - if err != nil { - return nil, errwrap.Wrapf("client setup failed: {{err}}", err) - } - maxParStr, ok := conf["max_parallel"] var maxParInt int if ok { @@ -296,92 +134,17 @@ func NewConsulBackend(conf map[string]string, logger log.Logger) (physical.Backe // Setup the backend c := &ConsulBackend{ - path: path, - logger: logger, - client: client, - kv: client.KV(), - permitPool: physical.NewPermitPool(maxParInt), - serviceName: service, - serviceTags: strutil.ParseDedupLowercaseAndSortStrings(tags, ","), - serviceAddress: serviceAddr, - checkTimeout: checkTimeout, - disableRegistration: disableRegistration, - consistencyMode: consistencyMode, - notifyActiveCh: make(chan notifyEvent), - notifySealedCh: make(chan notifyEvent), - notifyPerfStandbyCh: make(chan notifyEvent), - sessionTTL: sessionTTL, - lockWaitTime: lockWaitTime, - } - return c, nil -} - -func setupTLSConfig(conf map[string]string, address string) (*tls.Config, error) { - serverName, _, err := net.SplitHostPort(address) - switch { - case err == nil: - case strings.Contains(err.Error(), "missing port"): - serverName = conf["address"] - default: - return nil, err - } - - insecureSkipVerify := false - tlsSkipVerify, ok := conf["tls_skip_verify"] - - if ok && tlsSkipVerify != "" { - b, err := parseutil.ParseBool(tlsSkipVerify) - if err != nil { - return nil, errwrap.Wrapf("failed parsing tls_skip_verify parameter: {{err}}", err) - } - insecureSkipVerify = b - } + ConsulServiceRegistration: csreg, - tlsMinVersionStr, ok := conf["tls_min_version"] - if !ok { - // Set the default value - tlsMinVersionStr = "tls12" - } + path: path, + kv: csreg.Client.KV(), + permitPool: physical.NewPermitPool(maxParInt), + consistencyMode: consistencyMode, - tlsMinVersion, ok := tlsutil.TLSLookup[tlsMinVersionStr] - if !ok { - return nil, fmt.Errorf("invalid 'tls_min_version'") + sessionTTL: sessionTTL, + lockWaitTime: lockWaitTime, } - - tlsClientConfig := &tls.Config{ - MinVersion: tlsMinVersion, - InsecureSkipVerify: insecureSkipVerify, - ServerName: serverName, - } - - _, okCert := conf["tls_cert_file"] - _, okKey := conf["tls_key_file"] - - if okCert && okKey { - tlsCert, err := tls.LoadX509KeyPair(conf["tls_cert_file"], conf["tls_key_file"]) - if err != nil { - return nil, errwrap.Wrapf("client tls setup failed: {{err}}", err) - } - - tlsClientConfig.Certificates = []tls.Certificate{tlsCert} - } - - if tlsCaFile, ok := conf["tls_ca_file"]; ok { - caPool := x509.NewCertPool() - - data, err := ioutil.ReadFile(tlsCaFile) - if err != nil { - return nil, errwrap.Wrapf("failed to read CA file: {{err}}", err) - } - - if !caPool.AppendCertsFromPEM(data) { - return nil, fmt.Errorf("failed to parse CA certificate") - } - - tlsClientConfig.RootCAs = caPool - } - - return tlsClientConfig, nil + return c, nil } // Used to run multiple entries via a transaction @@ -539,12 +302,12 @@ func (c *ConsulBackend) LockWith(key, value string) (physical.Lock, error) { SessionTTL: c.sessionTTL, LockWaitTime: c.lockWaitTime, } - lock, err := c.client.LockOpts(opts) + lock, err := c.Client.LockOpts(opts) if err != nil { return nil, errwrap.Wrapf("failed to create lock: {{err}}", err) } cl := &ConsulLock{ - client: c.client, + client: c.Client, key: c.path + key, lock: lock, consistencyMode: c.consistencyMode, @@ -560,7 +323,7 @@ func (c *ConsulBackend) HAEnabled() bool { // DetectHostAddr is used to detect the host address by asking the Consul agent func (c *ConsulBackend) DetectHostAddr() (string, error) { - agent := c.client.Agent() + agent := c.Client.Agent() self, err := agent.Self() if err != nil { return "", err @@ -609,322 +372,3 @@ func (c *ConsulLock) Value() (bool, string, error) { value := string(pair.Value) return held, value, nil } - -func (c *ConsulBackend) NotifyActiveStateChange() error { - select { - case c.notifyActiveCh <- notifyEvent{}: - default: - // NOTE: If this occurs Vault's active status could be out of - // sync with Consul until reconcileTimer expires. - c.logger.Warn("concurrent state change notify dropped") - } - - return nil -} - -func (c *ConsulBackend) NotifyPerformanceStandbyStateChange() error { - select { - case c.notifyPerfStandbyCh <- notifyEvent{}: - default: - // NOTE: If this occurs Vault's active status could be out of - // sync with Consul until reconcileTimer expires. - c.logger.Warn("concurrent state change notify dropped") - } - - return nil -} - -func (c *ConsulBackend) NotifySealedStateChange() error { - select { - case c.notifySealedCh <- notifyEvent{}: - default: - // NOTE: If this occurs Vault's sealed status could be out of - // sync with Consul until checkTimer expires. - c.logger.Warn("concurrent sealed state change notify dropped") - } - - return nil -} - -func (c *ConsulBackend) checkDuration() time.Duration { - return DurationMinusBuffer(c.checkTimeout, checkMinBuffer, checkJitterFactor) -} - -func (c *ConsulBackend) RunServiceDiscovery(waitGroup *sync.WaitGroup, shutdownCh physical.ShutdownChannel, redirectAddr string, activeFunc physical.ActiveFunction, sealedFunc physical.SealedFunction, perfStandbyFunc physical.PerformanceStandbyFunction) (err error) { - if err := c.setRedirectAddr(redirectAddr); err != nil { - return err - } - - // 'server' command will wait for the below goroutine to complete - waitGroup.Add(1) - - go c.runEventDemuxer(waitGroup, shutdownCh, redirectAddr, activeFunc, sealedFunc, perfStandbyFunc) - - return nil -} - -func (c *ConsulBackend) runEventDemuxer(waitGroup *sync.WaitGroup, shutdownCh physical.ShutdownChannel, redirectAddr string, activeFunc physical.ActiveFunction, sealedFunc physical.SealedFunction, perfStandbyFunc physical.PerformanceStandbyFunction) { - // This defer statement should be executed last. So push it first. - defer waitGroup.Done() - - // Fire the reconcileTimer immediately upon starting the event demuxer - reconcileTimer := time.NewTimer(0) - defer reconcileTimer.Stop() - - // Schedule the first check. Consul TTL checks are passing by - // default, checkTimer does not need to be run immediately. - checkTimer := time.NewTimer(c.checkDuration()) - defer checkTimer.Stop() - - // Use a reactor pattern to handle and dispatch events to singleton - // goroutine handlers for execution. It is not acceptable to drop - // inbound events from Notify*(). - // - // goroutines are dispatched if the demuxer can acquire a lock (via - // an atomic CAS incr) on the handler. Handlers are responsible for - // deregistering themselves (atomic CAS decr). Handlers and the - // demuxer share a lock to synchronize information at the beginning - // and end of a handler's life (or after a handler wakes up from - // sleeping during a back-off/retry). - var shutdown bool - var registeredServiceID string - checkLock := new(int32) - serviceRegLock := new(int32) - - for !shutdown { - select { - case <-c.notifyActiveCh: - // Run reconcile immediately upon active state change notification - reconcileTimer.Reset(0) - case <-c.notifySealedCh: - // Run check timer immediately upon a seal state change notification - checkTimer.Reset(0) - case <-c.notifyPerfStandbyCh: - // Run check timer immediately upon a seal state change notification - checkTimer.Reset(0) - case <-reconcileTimer.C: - // Unconditionally rearm the reconcileTimer - reconcileTimer.Reset(reconcileTimeout - RandomStagger(reconcileTimeout/checkJitterFactor)) - - // Abort if service discovery is disabled or a - // reconcile handler is already active - if !c.disableRegistration && atomic.CompareAndSwapInt32(serviceRegLock, 0, 1) { - // Enter handler with serviceRegLock held - go func() { - defer atomic.CompareAndSwapInt32(serviceRegLock, 1, 0) - for !shutdown { - serviceID, err := c.reconcileConsul(registeredServiceID, activeFunc, sealedFunc, perfStandbyFunc) - if err != nil { - if c.logger.IsWarn() { - c.logger.Warn("reconcile unable to talk with Consul backend", "error", err) - } - time.Sleep(consulRetryInterval) - continue - } - - c.serviceLock.Lock() - defer c.serviceLock.Unlock() - - registeredServiceID = serviceID - return - } - }() - } - case <-checkTimer.C: - checkTimer.Reset(c.checkDuration()) - // Abort if service discovery is disabled or a - // reconcile handler is active - if !c.disableRegistration && atomic.CompareAndSwapInt32(checkLock, 0, 1) { - // Enter handler with checkLock held - go func() { - defer atomic.CompareAndSwapInt32(checkLock, 1, 0) - for !shutdown { - sealed := sealedFunc() - if err := c.runCheck(sealed); err != nil { - if c.logger.IsWarn() { - c.logger.Warn("check unable to talk with Consul backend", "error", err) - } - time.Sleep(consulRetryInterval) - continue - } - return - } - }() - } - case <-shutdownCh: - c.logger.Info("shutting down consul backend") - shutdown = true - } - } - - c.serviceLock.RLock() - defer c.serviceLock.RUnlock() - if err := c.client.Agent().ServiceDeregister(registeredServiceID); err != nil { - if c.logger.IsWarn() { - c.logger.Warn("service deregistration failed", "error", err) - } - } -} - -// checkID returns the ID used for a Consul Check. Assume at least a read -// lock is held. -func (c *ConsulBackend) checkID() string { - return fmt.Sprintf("%s:vault-sealed-check", c.serviceID()) -} - -// serviceID returns the Vault ServiceID for use in Consul. Assume at least -// a read lock is held. -func (c *ConsulBackend) serviceID() string { - return fmt.Sprintf("%s:%s:%d", c.serviceName, c.redirectHost, c.redirectPort) -} - -// reconcileConsul queries the state of Vault Core and Consul and fixes up -// Consul's state according to what's in Vault. reconcileConsul is called -// without any locks held and can be run concurrently, therefore no changes -// to ConsulBackend can be made in this method (i.e. wtb const receiver for -// compiler enforced safety). -func (c *ConsulBackend) reconcileConsul(registeredServiceID string, activeFunc physical.ActiveFunction, sealedFunc physical.SealedFunction, perfStandbyFunc physical.PerformanceStandbyFunction) (serviceID string, err error) { - // Query vault Core for its current state - active := activeFunc() - sealed := sealedFunc() - perfStandby := perfStandbyFunc() - - agent := c.client.Agent() - catalog := c.client.Catalog() - - serviceID = c.serviceID() - - // Get the current state of Vault from Consul - var currentVaultService *api.CatalogService - if services, _, err := catalog.Service(c.serviceName, "", &api.QueryOptions{AllowStale: true}); err == nil { - for _, service := range services { - if serviceID == service.ServiceID { - currentVaultService = service - break - } - } - } - - tags := c.fetchServiceTags(active, perfStandby) - - var reregister bool - - switch { - case currentVaultService == nil, registeredServiceID == "": - reregister = true - default: - switch { - case !strutil.EquivalentSlices(currentVaultService.ServiceTags, tags): - reregister = true - } - } - - if !reregister { - // When re-registration is not required, return a valid serviceID - // to avoid registration in the next cycle. - return serviceID, nil - } - - // If service address was set explicitly in configuration, use that - // as the service-specific address instead of the HA redirect address. - var serviceAddress string - if c.serviceAddress == nil { - serviceAddress = c.redirectHost - } else { - serviceAddress = *c.serviceAddress - } - - service := &api.AgentServiceRegistration{ - ID: serviceID, - Name: c.serviceName, - Tags: tags, - Port: int(c.redirectPort), - Address: serviceAddress, - EnableTagOverride: false, - } - - checkStatus := api.HealthCritical - if !sealed { - checkStatus = api.HealthPassing - } - - sealedCheck := &api.AgentCheckRegistration{ - ID: c.checkID(), - Name: "Vault Sealed Status", - Notes: "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server", - ServiceID: serviceID, - AgentServiceCheck: api.AgentServiceCheck{ - TTL: c.checkTimeout.String(), - Status: checkStatus, - }, - } - - if err := agent.ServiceRegister(service); err != nil { - return "", errwrap.Wrapf(`service registration failed: {{err}}`, err) - } - - if err := agent.CheckRegister(sealedCheck); err != nil { - return serviceID, errwrap.Wrapf(`service check registration failed: {{err}}`, err) - } - - return serviceID, nil -} - -// runCheck immediately pushes a TTL check. -func (c *ConsulBackend) runCheck(sealed bool) error { - // Run a TTL check - agent := c.client.Agent() - if !sealed { - return agent.PassTTL(c.checkID(), "Vault Unsealed") - } else { - return agent.FailTTL(c.checkID(), "Vault Sealed") - } -} - -// fetchServiceTags returns all of the relevant tags for Consul. -func (c *ConsulBackend) fetchServiceTags(active bool, perfStandby bool) []string { - activeTag := "standby" - if active { - activeTag = "active" - } - - result := append(c.serviceTags, activeTag) - - if perfStandby { - result = append(c.serviceTags, "performance-standby") - } - - return result -} - -func (c *ConsulBackend) setRedirectAddr(addr string) (err error) { - if addr == "" { - return fmt.Errorf("redirect address must not be empty") - } - - url, err := url.Parse(addr) - if err != nil { - return errwrap.Wrapf(fmt.Sprintf("failed to parse redirect URL %q: {{err}}", addr), err) - } - - var portStr string - c.redirectHost, portStr, err = net.SplitHostPort(url.Host) - if err != nil { - if url.Scheme == "http" { - portStr = "80" - } else if url.Scheme == "https" { - portStr = "443" - } else if url.Scheme == "unix" { - portStr = "-1" - c.redirectHost = url.Path - } else { - return errwrap.Wrapf(fmt.Sprintf(`failed to find a host:port in redirect address "%v": {{err}}`, url.Host), err) - } - } - c.redirectPort, err = strconv.ParseInt(portStr, 10, 0) - if err != nil || c.redirectPort < -1 || c.redirectPort > 65535 { - return errwrap.Wrapf(fmt.Sprintf(`failed to parse valid port "%v": {{err}}`, portStr), err) - } - - return nil -} diff --git a/physical/consul/consul_test.go b/physical/consul/consul_test.go index b474fb9ae5d8..5d12423daa22 100644 --- a/physical/consul/consul_test.go +++ b/physical/consul/consul_test.go @@ -15,8 +15,8 @@ import ( log "github.com/hashicorp/go-hclog" "github.com/hashicorp/vault/helper/testhelpers/consul" "github.com/hashicorp/vault/sdk/helper/logging" - "github.com/hashicorp/vault/sdk/helper/strutil" "github.com/hashicorp/vault/sdk/physical" + sr "github.com/hashicorp/vault/serviceregistration" ) type consulConf map[string]string @@ -52,7 +52,7 @@ func testConsul_testConsulBackend(t *testing.T) { } } -func testActiveFunc(activePct float64) physical.ActiveFunction { +func testActiveFunc(activePct float64) sr.ActiveFunction { return func() bool { var active bool standbyProb := rand.Float64() @@ -63,7 +63,7 @@ func testActiveFunc(activePct float64) physical.ActiveFunction { } } -func testSealedFunc(sealedPct float64) physical.SealedFunction { +func testSealedFunc(sealedPct float64) sr.SealedFunction { return func() bool { var sealed bool unsealedProb := rand.Float64() @@ -74,7 +74,7 @@ func testSealedFunc(sealedPct float64) physical.SealedFunction { } } -func testPerformanceStandbyFunc(perfPct float64) physical.PerformanceStandbyFunction { +func testPerformanceStandbyFunc(perfPct float64) sr.PerformanceStandbyFunction { return func() bool { var ps bool unsealedProb := rand.Float64() @@ -85,98 +85,6 @@ func testPerformanceStandbyFunc(perfPct float64) physical.PerformanceStandbyFunc } } -func TestConsul_ServiceTags(t *testing.T) { - consulConfig := map[string]string{ - "path": "seaTech/", - "service": "astronomy", - "service_tags": "deadbeef, cafeefac, deadc0de, feedface", - "redirect_addr": "http://127.0.0.2:8200", - "check_timeout": "6s", - "address": "127.0.0.2", - "scheme": "https", - "token": "deadbeef-cafeefac-deadc0de-feedface", - "max_parallel": "4", - "disable_registration": "false", - } - logger := logging.NewVaultLogger(log.Debug) - - be, err := NewConsulBackend(consulConfig, logger) - if err != nil { - t.Fatal(err) - } - - c, ok := be.(*ConsulBackend) - if !ok { - t.Fatalf("failed to create physical Consul backend") - } - - expected := []string{"deadbeef", "cafeefac", "deadc0de", "feedface"} - actual := c.fetchServiceTags(false, false) - if !strutil.EquivalentSlices(actual, append(expected, "standby")) { - t.Fatalf("bad: expected:%s actual:%s", append(expected, "standby"), actual) - } - - actual = c.fetchServiceTags(true, false) - if !strutil.EquivalentSlices(actual, append(expected, "active")) { - t.Fatalf("bad: expected:%s actual:%s", append(expected, "active"), actual) - } - - actual = c.fetchServiceTags(false, true) - if !strutil.EquivalentSlices(actual, append(expected, "performance-standby")) { - t.Fatalf("bad: expected:%s actual:%s", append(expected, "performance-standby"), actual) - } - - actual = c.fetchServiceTags(true, true) - if !strutil.EquivalentSlices(actual, append(expected, "performance-standby")) { - t.Fatalf("bad: expected:%s actual:%s", append(expected, "performance-standby"), actual) - } -} - -func TestConsul_ServiceAddress(t *testing.T) { - tests := []struct { - consulConfig map[string]string - serviceAddrNil bool - }{ - { - consulConfig: map[string]string{ - "service_address": "", - }, - }, - { - consulConfig: map[string]string{ - "service_address": "vault.example.com", - }, - }, - { - serviceAddrNil: true, - }, - } - - for _, test := range tests { - logger := logging.NewVaultLogger(log.Debug) - - be, err := NewConsulBackend(test.consulConfig, logger) - if err != nil { - t.Fatalf("expected Consul to initialize: %v", err) - } - - c, ok := be.(*ConsulBackend) - if !ok { - t.Fatalf("Expected ConsulBackend") - } - - if test.serviceAddrNil { - if c.serviceAddress != nil { - t.Fatalf("expected service address to be nil") - } - } else { - if c.serviceAddress == nil { - t.Fatalf("did not expect service address to be nil") - } - } - } -} - func TestConsul_newConsulBackend(t *testing.T) { tests := []struct { name string @@ -294,40 +202,24 @@ func TestConsul_newConsulBackend(t *testing.T) { if !ok { t.Fatalf("Expected ConsulBackend: %s", test.name) } - c.disableRegistration = true - - if c.disableRegistration == false { - addr := os.Getenv("CONSUL_HTTP_ADDR") - if addr == "" { - continue - } - } - var shutdownCh physical.ShutdownChannel + var shutdownCh sr.ShutdownChannel waitGroup := &sync.WaitGroup{} - if err := c.RunServiceDiscovery(waitGroup, shutdownCh, test.redirectAddr, testActiveFunc(0.5), testSealedFunc(0.5), testPerformanceStandbyFunc(0.5)); err != nil { + if err := c.RunServiceRegistration(waitGroup, shutdownCh, test.redirectAddr, testActiveFunc(0.5), testSealedFunc(0.5), testPerformanceStandbyFunc(0.5)); err != nil { t.Fatalf("bad: %v", err) } - if test.checkTimeout != c.checkTimeout { - t.Errorf("bad: %v != %v", test.checkTimeout, c.checkTimeout) - } - if test.path != c.path { t.Errorf("bad: %s %v != %v", test.name, test.path, c.path) } - if test.service != c.serviceName { - t.Errorf("bad: %v != %v", test.service, c.serviceName) - } - if test.consistencyMode != c.consistencyMode { t.Errorf("bad consistency_mode value: %v != %v", test.consistencyMode, c.consistencyMode) } // The configuration stored in the Consul "client" object is not exported, so // we either have to skip validating it, or add a method to export it, or use reflection. - consulConfig := reflect.Indirect(reflect.ValueOf(c.client)).FieldByName("config") + consulConfig := reflect.Indirect(reflect.ValueOf(c.Client)).FieldByName("config") consulConfigScheme := consulConfig.FieldByName("Scheme").String() consulConfigAddress := consulConfig.FieldByName("Address").String() @@ -346,109 +238,6 @@ func TestConsul_newConsulBackend(t *testing.T) { } } -func TestConsul_serviceTags(t *testing.T) { - tests := []struct { - active bool - perfStandby bool - tags []string - }{ - { - active: true, - perfStandby: false, - tags: []string{"active"}, - }, - { - active: false, - perfStandby: false, - tags: []string{"standby"}, - }, - { - active: false, - perfStandby: true, - tags: []string{"performance-standby"}, - }, - { - active: true, - perfStandby: true, - tags: []string{"performance-standby"}, - }, - } - - c := testConsulBackend(t) - - for _, test := range tests { - tags := c.fetchServiceTags(test.active, test.perfStandby) - if !reflect.DeepEqual(tags[:], test.tags[:]) { - t.Errorf("Bad %v: %v %v", test.active, tags, test.tags) - } - } -} - -func TestConsul_setRedirectAddr(t *testing.T) { - tests := []struct { - addr string - host string - port int64 - pass bool - }{ - { - addr: "http://127.0.0.1:8200/", - host: "127.0.0.1", - port: 8200, - pass: true, - }, - { - addr: "http://127.0.0.1:8200", - host: "127.0.0.1", - port: 8200, - pass: true, - }, - { - addr: "https://127.0.0.1:8200", - host: "127.0.0.1", - port: 8200, - pass: true, - }, - { - addr: "unix:///tmp/.vault.addr.sock", - host: "/tmp/.vault.addr.sock", - port: -1, - pass: true, - }, - { - addr: "127.0.0.1:8200", - pass: false, - }, - { - addr: "127.0.0.1", - pass: false, - }, - } - for _, test := range tests { - c := testConsulBackend(t) - err := c.setRedirectAddr(test.addr) - if test.pass { - if err != nil { - t.Fatalf("bad: %v", err) - } - } else { - if err == nil { - t.Fatalf("bad, expected fail") - } else { - continue - } - } - - if c.redirectHost != test.host { - t.Fatalf("bad: %v != %v", c.redirectHost, test.host) - } - - if c.redirectPort != test.port { - t.Fatalf("bad: %v != %v", c.redirectPort, test.port) - } - } -} - func TestConsul_NotifyActiveStateChange(t *testing.T) { c := testConsulBackend(t) @@ -465,76 +254,6 @@ func TestConsul_NotifySealedStateChange(t *testing.T) { } } -func TestConsul_serviceID(t *testing.T) { - tests := []struct { - name string - redirectAddr string - serviceName string - expected string - valid bool - }{ - { - name: "valid host w/o slash", - redirectAddr: "http://127.0.0.1:8200", - serviceName: "sea-tech-astronomy", - expected: "sea-tech-astronomy:127.0.0.1:8200", - valid: true, - }, - { - name: "valid host w/ slash", - redirectAddr: "http://127.0.0.1:8200/", - serviceName: "sea-tech-astronomy", - expected: "sea-tech-astronomy:127.0.0.1:8200", - valid: true, - }, - { - name: "valid https host w/ slash", - redirectAddr: "https://127.0.0.1:8200/", - serviceName: "sea-tech-astronomy", - expected: "sea-tech-astronomy:127.0.0.1:8200", - valid: true, - }, - { - name: "invalid host name", - redirectAddr: "https://127.0.0.1:8200/", - serviceName: "sea_tech_astronomy", - expected: "", - valid: false, - }, - } - - logger := logging.NewVaultLogger(log.Debug) - - for _, test := range tests { - be, err := NewConsulBackend(consulConf{ - "service": test.serviceName, - }, logger) - if !test.valid { - if err == nil { - t.Fatalf("expected an error initializing for name %q", test.serviceName) - } - continue - } - if test.valid && err != nil { - t.Fatalf("expected Consul to initialize: %v", err) - } - - c, ok := be.(*ConsulBackend) - if !ok { - t.Fatalf("Expected ConsulBackend") - } - - if err := c.setRedirectAddr(test.redirectAddr); err != nil { - t.Fatalf("bad: %s %v", test.name, err) - } - - serviceID := c.serviceID() - if serviceID != test.expected { - t.Fatalf("bad: %v != %v", serviceID, test.expected) - } - } -} - func TestConsulBackend(t *testing.T) { consulToken := os.Getenv("CONSUL_HTTP_TOKEN") addr := os.Getenv("CONSUL_HTTP_ADDR") diff --git a/sdk/physical/physical.go b/sdk/physical/physical.go index ec98e905909c..8cc4e9ab17f1 100644 --- a/sdk/physical/physical.go +++ b/sdk/physical/physical.go @@ -3,7 +3,6 @@ package physical import ( "context" "strings" - "sync" log "github.com/hashicorp/go-hclog" ) @@ -24,9 +23,6 @@ const ( ErrValueTooLarge = "put failed due to value being too large" ) -// ShutdownSignal -type ShutdownChannel chan struct{} - // Backend is the interface required for a physical // backend. A physical backend is used to durably store // data outside of Vault. As such, it is completely untrusted, @@ -76,35 +72,6 @@ type RedirectDetect interface { DetectHostAddr() (string, error) } -// Callback signatures for RunServiceDiscovery -type ActiveFunction func() bool -type SealedFunction func() bool -type PerformanceStandbyFunction func() bool - -// ServiceDiscovery is an optional interface that an HABackend can implement. -// If they do, the state of a backend is advertised to the service discovery -// network. -type ServiceDiscovery interface { - // NotifyActiveStateChange is used by Core to notify a backend - // capable of ServiceDiscovery that this Vault instance has changed - // its status to active or standby. - NotifyActiveStateChange() error - - // NotifySealedStateChange is used by Core to notify a backend - // capable of ServiceDiscovery that Vault has changed its Sealed - // status to sealed or unsealed. - NotifySealedStateChange() error - - // NotifyPerformanceStandbyStateChange is used by Core to notify a backend - // capable of ServiceDiscovery that this Vault instance has changed it - // status to performance standby or standby. - NotifyPerformanceStandbyStateChange() error - - // Run executes any background service discovery tasks until the - // shutdown channel is closed. - RunServiceDiscovery(waitGroup *sync.WaitGroup, shutdownCh ShutdownChannel, redirectAddr string, activeFunc ActiveFunction, sealedFunc SealedFunction, perfStandbyFunc PerformanceStandbyFunction) error -} - type Lock interface { // Lock is used to acquire the given lock // The stopCh is optional and if closed should interrupt the lock diff --git a/serviceregistration/consul/consul_service_registration.go b/serviceregistration/consul/consul_service_registration.go new file mode 100644 index 000000000000..2850b9c242c1 --- /dev/null +++ b/serviceregistration/consul/consul_service_registration.go @@ -0,0 +1,641 @@ +package consul + +import ( + "crypto/tls" + "crypto/x509" + "errors" + "fmt" + "io/ioutil" + "math/rand" + "net" + "net/http" + "net/url" + "regexp" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/hashicorp/consul/api" + "github.com/hashicorp/errwrap" + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/vault/sdk/helper/consts" + "github.com/hashicorp/vault/sdk/helper/parseutil" + "github.com/hashicorp/vault/sdk/helper/strutil" + "github.com/hashicorp/vault/sdk/helper/tlsutil" + sr "github.com/hashicorp/vault/serviceregistration" + "golang.org/x/net/http2" +) + +const ( + // checkJitterFactor specifies the jitter factor used to stagger checks + checkJitterFactor = 16 + + // checkMinBuffer specifies provides a guarantee that a check will not + // be executed too close to the TTL check timeout + checkMinBuffer = 100 * time.Millisecond + + // consulRetryInterval specifies the retry duration to use when an + // API call to the Consul agent fails. + consulRetryInterval = 1 * time.Second + + // defaultCheckTimeout changes the timeout of TTL checks + defaultCheckTimeout = 5 * time.Second + + // DefaultServiceName is the default Consul service name used when + // advertising a Vault instance. + DefaultServiceName = "vault" + + // reconcileTimeout is how often Vault should query Consul to detect + // and fix any state drift. + reconcileTimeout = 60 * time.Second +) + +type notifyEvent struct{} + +var _ sr.ServiceRegistration = (*ConsulServiceRegistration)(nil) + +var ( + hostnameRegex = regexp.MustCompile(`^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$`) +) + +// ConsulServiceRegistration is a ServiceRegistration that advertises the state of +// Vault to Consul. +type ConsulServiceRegistration struct { + Client *api.Client + + logger log.Logger + serviceLock sync.RWMutex + redirectHost string + redirectPort int64 + serviceName string + serviceTags []string + serviceAddress *string + disableRegistration bool + checkTimeout time.Duration + + notifyActiveCh chan notifyEvent + notifySealedCh chan notifyEvent + notifyPerfStandbyCh chan notifyEvent +} + +// NewConsulServiceRegistration constructs a Consul-based ServiceRegistration. +func NewConsulServiceRegistration(conf map[string]string, logger log.Logger) (sr.ServiceRegistration, error) { + + // Allow admins to disable consul integration + disableReg, ok := conf["disable_registration"] + var disableRegistration bool + if ok && disableReg != "" { + b, err := parseutil.ParseBool(disableReg) + if err != nil { + return nil, errwrap.Wrapf("failed parsing disable_registration parameter: {{err}}", err) + } + disableRegistration = b + } + if logger.IsDebug() { + logger.Debug("config disable_registration set", "disable_registration", disableRegistration) + } + + // Get the service name to advertise in Consul + service, ok := conf["service"] + if !ok { + service = DefaultServiceName + } + if !hostnameRegex.MatchString(service) { + return nil, errors.New("service name must be valid per RFC 1123 and can contain only alphanumeric characters or dashes") + } + if logger.IsDebug() { + logger.Debug("config service set", "service", service) + } + + // Get the additional tags to attach to the registered service name + tags := conf["service_tags"] + if logger.IsDebug() { + logger.Debug("config service_tags set", "service_tags", tags) + } + + // Get the service-specific address to override the use of the HA redirect address + var serviceAddr *string + serviceAddrStr, ok := conf["service_address"] + if ok { + serviceAddr = &serviceAddrStr + } + if logger.IsDebug() { + logger.Debug("config service_address set", "service_address", serviceAddr) + } + + checkTimeout := defaultCheckTimeout + checkTimeoutStr, ok := conf["check_timeout"] + if ok { + d, err := parseutil.ParseDurationSecond(checkTimeoutStr) + if err != nil { + return nil, err + } + + min, _ := durationMinusBufferDomain(d, checkMinBuffer, checkJitterFactor) + if min < checkMinBuffer { + return nil, fmt.Errorf("consul check_timeout must be greater than %v", min) + } + + checkTimeout = d + if logger.IsDebug() { + logger.Debug("config check_timeout set", "check_timeout", d) + } + } + + // Configure the client + consulConf := api.DefaultConfig() + // Set MaxIdleConnsPerHost to the number of processes used in expiration.Restore + consulConf.Transport.MaxIdleConnsPerHost = consts.ExpirationRestoreWorkerCount + + if addr, ok := conf["address"]; ok { + consulConf.Address = addr + if logger.IsDebug() { + logger.Debug("config address set", "address", addr) + } + + // Copied from the Consul API module; set the Scheme based on + // the protocol field if address looks ike a URL. + // This can enable the TLS configuration below. + parts := strings.SplitN(addr, "://", 2) + if len(parts) == 2 { + if parts[0] == "http" || parts[0] == "https" { + consulConf.Scheme = parts[0] + consulConf.Address = parts[1] + if logger.IsDebug() { + logger.Debug("config address parsed", "scheme", parts[0]) + logger.Debug("config scheme parsed", "address", parts[1]) + } + } // allow "unix:" or whatever else consul supports in the future + } + } + if scheme, ok := conf["scheme"]; ok { + consulConf.Scheme = scheme + if logger.IsDebug() { + logger.Debug("config scheme set", "scheme", scheme) + } + } + if token, ok := conf["token"]; ok { + consulConf.Token = token + logger.Debug("config token set") + } + + if consulConf.Scheme == "https" { + // Use the parsed Address instead of the raw conf['address'] + tlsClientConfig, err := setupTLSConfig(conf, consulConf.Address) + if err != nil { + return nil, err + } + + consulConf.Transport.TLSClientConfig = tlsClientConfig + if err := http2.ConfigureTransport(consulConf.Transport); err != nil { + return nil, err + } + logger.Debug("configured TLS") + } + + consulConf.HttpClient = &http.Client{Transport: consulConf.Transport} + client, err := api.NewClient(consulConf) + if err != nil { + return nil, errwrap.Wrapf("client setup failed: {{err}}", err) + } + + // Setup the backend + c := &ConsulServiceRegistration{ + Client: client, + + logger: logger, + serviceName: service, + serviceTags: strutil.ParseDedupLowercaseAndSortStrings(tags, ","), + serviceAddress: serviceAddr, + checkTimeout: checkTimeout, + disableRegistration: disableRegistration, + + notifyActiveCh: make(chan notifyEvent), + notifySealedCh: make(chan notifyEvent), + notifyPerfStandbyCh: make(chan notifyEvent), + } + return c, nil +} + +func setupTLSConfig(conf map[string]string, address string) (*tls.Config, error) { + serverName, _, err := net.SplitHostPort(address) + switch { + case err == nil: + case strings.Contains(err.Error(), "missing port"): + serverName = conf["address"] + default: + return nil, err + } + + insecureSkipVerify := false + tlsSkipVerify, ok := conf["tls_skip_verify"] + + if ok && tlsSkipVerify != "" { + b, err := parseutil.ParseBool(tlsSkipVerify) + if err != nil { + return nil, errwrap.Wrapf("failed parsing tls_skip_verify parameter: {{err}}", err) + } + insecureSkipVerify = b + } + + tlsMinVersionStr, ok := conf["tls_min_version"] + if !ok { + // Set the default value + tlsMinVersionStr = "tls12" + } + + tlsMinVersion, ok := tlsutil.TLSLookup[tlsMinVersionStr] + if !ok { + return nil, fmt.Errorf("invalid 'tls_min_version'") + } + + tlsClientConfig := &tls.Config{ + MinVersion: tlsMinVersion, + InsecureSkipVerify: insecureSkipVerify, + ServerName: serverName, + } + + _, okCert := conf["tls_cert_file"] + _, okKey := conf["tls_key_file"] + + if okCert && okKey { + tlsCert, err := tls.LoadX509KeyPair(conf["tls_cert_file"], conf["tls_key_file"]) + if err != nil { + return nil, errwrap.Wrapf("client tls setup failed: {{err}}", err) + } + + tlsClientConfig.Certificates = []tls.Certificate{tlsCert} + } + + if tlsCaFile, ok := conf["tls_ca_file"]; ok { + caPool := x509.NewCertPool() + + data, err := ioutil.ReadFile(tlsCaFile) + if err != nil { + return nil, errwrap.Wrapf("failed to read CA file: {{err}}", err) + } + + if !caPool.AppendCertsFromPEM(data) { + return nil, fmt.Errorf("failed to parse CA certificate") + } + + tlsClientConfig.RootCAs = caPool + } + + return tlsClientConfig, nil +} + +func (c *ConsulServiceRegistration) NotifyActiveStateChange() error { + select { + case c.notifyActiveCh <- notifyEvent{}: + default: + // NOTE: If this occurs Vault's active status could be out of + // sync with Consul until reconcileTimer expires. + c.logger.Warn("concurrent state change notify dropped") + } + + return nil +} + +func (c *ConsulServiceRegistration) NotifyPerformanceStandbyStateChange() error { + select { + case c.notifyPerfStandbyCh <- notifyEvent{}: + default: + // NOTE: If this occurs Vault's active status could be out of + // sync with Consul until reconcileTimer expires. + c.logger.Warn("concurrent state change notify dropped") + } + + return nil +} + +func (c *ConsulServiceRegistration) NotifySealedStateChange() error { + select { + case c.notifySealedCh <- notifyEvent{}: + default: + // NOTE: If this occurs Vault's sealed status could be out of + // sync with Consul until checkTimer expires. + c.logger.Warn("concurrent sealed state change notify dropped") + } + + return nil +} + +func (c *ConsulServiceRegistration) checkDuration() time.Duration { + return durationMinusBuffer(c.checkTimeout, checkMinBuffer, checkJitterFactor) +} + +func (c *ConsulServiceRegistration) RunServiceRegistration(waitGroup *sync.WaitGroup, shutdownCh sr.ShutdownChannel, redirectAddr string, activeFunc sr.ActiveFunction, sealedFunc sr.SealedFunction, perfStandbyFunc sr.PerformanceStandbyFunction) (err error) { + if err := c.setRedirectAddr(redirectAddr); err != nil { + return err + } + + // 'server' command will wait for the below goroutine to complete + waitGroup.Add(1) + + go c.runEventDemuxer(waitGroup, shutdownCh, redirectAddr, activeFunc, sealedFunc, perfStandbyFunc) + + return nil +} + +func (c *ConsulServiceRegistration) runEventDemuxer(waitGroup *sync.WaitGroup, shutdownCh sr.ShutdownChannel, redirectAddr string, activeFunc sr.ActiveFunction, sealedFunc sr.SealedFunction, perfStandbyFunc sr.PerformanceStandbyFunction) { + // This defer statement should be executed last. So push it first. + defer waitGroup.Done() + + // Fire the reconcileTimer immediately upon starting the event demuxer + reconcileTimer := time.NewTimer(0) + defer reconcileTimer.Stop() + + // Schedule the first check. Consul TTL checks are passing by + // default, checkTimer does not need to be run immediately. + checkTimer := time.NewTimer(c.checkDuration()) + defer checkTimer.Stop() + + // Use a reactor pattern to handle and dispatch events to singleton + // goroutine handlers for execution. It is not acceptable to drop + // inbound events from Notify*(). + // + // goroutines are dispatched if the demuxer can acquire a lock (via + // an atomic CAS incr) on the handler. Handlers are responsible for + // deregistering themselves (atomic CAS decr). Handlers and the + // demuxer share a lock to synchronize information at the beginning + // and end of a handler's life (or after a handler wakes up from + // sleeping during a back-off/retry). + var shutdown bool + var registeredServiceID string + checkLock := new(int32) + serviceRegLock := new(int32) + + for !shutdown { + select { + case <-c.notifyActiveCh: + // Run reconcile immediately upon active state change notification + reconcileTimer.Reset(0) + case <-c.notifySealedCh: + // Run check timer immediately upon a seal state change notification + checkTimer.Reset(0) + case <-c.notifyPerfStandbyCh: + // Run check timer immediately upon a seal state change notification + checkTimer.Reset(0) + case <-reconcileTimer.C: + // Unconditionally rearm the reconcileTimer + reconcileTimer.Reset(reconcileTimeout - randomStagger(reconcileTimeout/checkJitterFactor)) + + // Abort if service discovery is disabled or a + // reconcile handler is already active + if !c.disableRegistration && atomic.CompareAndSwapInt32(serviceRegLock, 0, 1) { + // Enter handler with serviceRegLock held + go func() { + defer atomic.CompareAndSwapInt32(serviceRegLock, 1, 0) + for !shutdown { + serviceID, err := c.reconcileConsul(registeredServiceID, activeFunc, sealedFunc, perfStandbyFunc) + if err != nil { + if c.logger.IsWarn() { + c.logger.Warn("reconcile unable to talk with Consul backend", "error", err) + } + time.Sleep(consulRetryInterval) + continue + } + + c.serviceLock.Lock() + defer c.serviceLock.Unlock() + + registeredServiceID = serviceID + return + } + }() + } + case <-checkTimer.C: + checkTimer.Reset(c.checkDuration()) + // Abort if service discovery is disabled or a + // reconcile handler is active + if !c.disableRegistration && atomic.CompareAndSwapInt32(checkLock, 0, 1) { + // Enter handler with checkLock held + go func() { + defer atomic.CompareAndSwapInt32(checkLock, 1, 0) + for !shutdown { + sealed := sealedFunc() + if err := c.runCheck(sealed); err != nil { + if c.logger.IsWarn() { + c.logger.Warn("check unable to talk with Consul backend", "error", err) + } + time.Sleep(consulRetryInterval) + continue + } + return + } + }() + } + case <-shutdownCh: + c.logger.Info("shutting down consul backend") + shutdown = true + } + } + + c.serviceLock.RLock() + defer c.serviceLock.RUnlock() + if err := c.Client.Agent().ServiceDeregister(registeredServiceID); err != nil { + if c.logger.IsWarn() { + c.logger.Warn("service deregistration failed", "error", err) + } + } +} + +// checkID returns the ID used for a Consul Check. Assume at least a read +// lock is held. +func (c *ConsulServiceRegistration) checkID() string { + return fmt.Sprintf("%s:vault-sealed-check", c.serviceID()) +} + +// serviceID returns the Vault ServiceID for use in Consul. Assume at least +// a read lock is held. +func (c *ConsulServiceRegistration) serviceID() string { + return fmt.Sprintf("%s:%s:%d", c.serviceName, c.redirectHost, c.redirectPort) +} + +// reconcileConsul queries the state of Vault Core and Consul and fixes up +// Consul's state according to what's in Vault. reconcileConsul is called +// without any locks held and can be run concurrently, therefore no changes +// to ConsulServiceRegistration can be made in this method (i.e. wtb const receiver for +// compiler enforced safety). +func (c *ConsulServiceRegistration) reconcileConsul(registeredServiceID string, activeFunc sr.ActiveFunction, sealedFunc sr.SealedFunction, perfStandbyFunc sr.PerformanceStandbyFunction) (serviceID string, err error) { + // Query vault Core for its current state + active := activeFunc() + sealed := sealedFunc() + perfStandby := perfStandbyFunc() + + agent := c.Client.Agent() + catalog := c.Client.Catalog() + + serviceID = c.serviceID() + + // Get the current state of Vault from Consul + var currentVaultService *api.CatalogService + if services, _, err := catalog.Service(c.serviceName, "", &api.QueryOptions{AllowStale: true}); err == nil { + for _, service := range services { + if serviceID == service.ServiceID { + currentVaultService = service + break + } + } + } + + tags := c.fetchServiceTags(active, perfStandby) + + var reregister bool + + switch { + case currentVaultService == nil, registeredServiceID == "": + reregister = true + default: + switch { + case !strutil.EquivalentSlices(currentVaultService.ServiceTags, tags): + reregister = true + } + } + + if !reregister { + // When re-registration is not required, return a valid serviceID + // to avoid registration in the next cycle. + return serviceID, nil + } + + // If service address was set explicitly in configuration, use that + // as the service-specific address instead of the HA redirect address. + var serviceAddress string + if c.serviceAddress == nil { + serviceAddress = c.redirectHost + } else { + serviceAddress = *c.serviceAddress + } + + service := &api.AgentServiceRegistration{ + ID: serviceID, + Name: c.serviceName, + Tags: tags, + Port: int(c.redirectPort), + Address: serviceAddress, + EnableTagOverride: false, + } + + checkStatus := api.HealthCritical + if !sealed { + checkStatus = api.HealthPassing + } + + sealedCheck := &api.AgentCheckRegistration{ + ID: c.checkID(), + Name: "Vault Sealed Status", + Notes: "Vault service is healthy when Vault is in an unsealed status and can become an active Vault server", + ServiceID: serviceID, + AgentServiceCheck: api.AgentServiceCheck{ + TTL: c.checkTimeout.String(), + Status: checkStatus, + }, + } + + if err := agent.ServiceRegister(service); err != nil { + return "", errwrap.Wrapf(`service registration failed: {{err}}`, err) + } + + if err := agent.CheckRegister(sealedCheck); err != nil { + return serviceID, errwrap.Wrapf(`service check registration failed: {{err}}`, err) + } + + return serviceID, nil +} + +// runCheck immediately pushes a TTL check. +func (c *ConsulServiceRegistration) runCheck(sealed bool) error { + // Run a TTL check + agent := c.Client.Agent() + if !sealed { + return agent.PassTTL(c.checkID(), "Vault Unsealed") + } else { + return agent.FailTTL(c.checkID(), "Vault Sealed") + } +} + +// fetchServiceTags returns all of the relevant tags for Consul. +func (c *ConsulServiceRegistration) fetchServiceTags(active bool, perfStandby bool) []string { + activeTag := "standby" + if active { + activeTag = "active" + } + + result := append(c.serviceTags, activeTag) + + if perfStandby { + result = append(c.serviceTags, "performance-standby") + } + + return result +} + +func (c *ConsulServiceRegistration) setRedirectAddr(addr string) (err error) { + if addr == "" { + return fmt.Errorf("redirect address must not be empty") + } + + url, err := url.Parse(addr) + if err != nil { + return errwrap.Wrapf(fmt.Sprintf("failed to parse redirect URL %q: {{err}}", addr), err) + } + + var portStr string + c.redirectHost, portStr, err = net.SplitHostPort(url.Host) + if err != nil { + if url.Scheme == "http" { + portStr = "80" + } else if url.Scheme == "https" { + portStr = "443" + } else if url.Scheme == "unix" { + portStr = "-1" + c.redirectHost = url.Path + } else { + return errwrap.Wrapf(fmt.Sprintf(`failed to find a host:port in redirect address "%v": {{err}}`, url.Host), err) + } + } + c.redirectPort, err = strconv.ParseInt(portStr, 10, 0) + if err != nil || c.redirectPort < -1 || c.redirectPort > 65535 { + return errwrap.Wrapf(fmt.Sprintf(`failed to parse valid port "%v": {{err}}`, portStr), err) + } + + return nil +} + +// durationMinusBuffer returns a duration, minus a buffer and jitter +// subtracted from the duration. This function is used primarily for +// servicing Consul TTL Checks in advance of the TTL. +func durationMinusBuffer(intv time.Duration, buffer time.Duration, jitter int64) time.Duration { + d := intv - buffer + if jitter == 0 { + d -= randomStagger(d) + } else { + d -= randomStagger(time.Duration(int64(d) / jitter)) + } + return d +} + +// durationMinusBufferDomain returns the domain of valid durations from a +// call to durationMinusBuffer. This function is used to check user +// specified input values to durationMinusBuffer. +func durationMinusBufferDomain(intv time.Duration, buffer time.Duration, jitter int64) (min time.Duration, max time.Duration) { + max = intv - buffer + if jitter == 0 { + min = max + } else { + min = max - time.Duration(int64(max)/jitter) + } + return min, max +} + +// randomStagger returns an interval between 0 and the duration +func randomStagger(intv time.Duration) time.Duration { + if intv == 0 { + return 0 + } + return time.Duration(uint64(rand.Int63()) % uint64(intv)) +} diff --git a/serviceregistration/consul/consul_service_registration_test.go b/serviceregistration/consul/consul_service_registration_test.go new file mode 100644 index 000000000000..730346d9a7ca --- /dev/null +++ b/serviceregistration/consul/consul_service_registration_test.go @@ -0,0 +1,632 @@ +package consul + +import ( + "math/rand" + "os" + "reflect" + "sync" + "testing" + "time" + + "github.com/go-test/deep" + "github.com/hashicorp/consul/api" + log "github.com/hashicorp/go-hclog" + "github.com/hashicorp/vault/helper/testhelpers/consul" + "github.com/hashicorp/vault/sdk/helper/logging" + "github.com/hashicorp/vault/sdk/helper/strutil" + "github.com/hashicorp/vault/sdk/physical" + "github.com/hashicorp/vault/sdk/physical/inmem" + sr "github.com/hashicorp/vault/serviceregistration" + "github.com/hashicorp/vault/vault" +) + +type consulConf map[string]string + +func testConsulServiceRegistration(t *testing.T) *ConsulServiceRegistration { + return testConsulServiceRegistrationConfig(t, &consulConf{}) +} + +func testConsulServiceRegistrationConfig(t *testing.T, conf *consulConf) *ConsulServiceRegistration { + logger := logging.NewVaultLogger(log.Debug) + + be, err := NewConsulServiceRegistration(*conf, logger) + if err != nil { + t.Fatalf("Expected Consul to initialize: %v", err) + } + + c, ok := be.(*ConsulServiceRegistration) + if !ok { + t.Fatalf("Expected ConsulServiceRegistration") + } + + return c +} + +func testActiveFunc(activePct float64) sr.ActiveFunction { + return func() bool { + var active bool + standbyProb := rand.Float64() + if standbyProb > activePct { + active = true + } + return active + } +} + +func testSealedFunc(sealedPct float64) sr.SealedFunction { + return func() bool { + var sealed bool + unsealedProb := rand.Float64() + if unsealedProb > sealedPct { + sealed = true + } + return sealed + } +} + +func testPerformanceStandbyFunc(perfPct float64) sr.PerformanceStandbyFunction { + return func() bool { + var ps bool + unsealedProb := rand.Float64() + if unsealedProb > perfPct { + ps = true + } + return ps + } +} + +// TestConsul_ServiceRegistration tests whether consul ServiceRegistration works +func TestConsul_ServiceRegistration(t *testing.T) { + + // Prepare a docker-based consul instance + cleanup, addr, token := consul.PrepareTestContainer(t, "1.4.0-rc1") + defer cleanup() + + // Create a consul client + cfg := api.DefaultConfig() + cfg.Address = addr + cfg.Token = token + client, err := api.NewClient(cfg) + if err != nil { + t.Fatal(err) + } + + // transitionFrom waits patiently for the services in the Consul catalog to + // transition from a known value, and then returns the new value. + transitionFrom := func(t *testing.T, known map[string][]string) map[string][]string { + t.Helper() + // Wait for up to 10 seconds + for i := 0; i < 10; i++ { + services, _, err := client.Catalog().Services(nil) + if err != nil { + t.Fatal(err) + } + if diff := deep.Equal(services, known); diff != nil { + return services + } + time.Sleep(time.Second) + } + t.Fatalf("Catalog Services never transitioned from %v", known) + return nil + } + + // Create a ServiceRegistration that points to our consul instance + logger := logging.NewVaultLogger(log.Trace) + sd, err := NewConsulServiceRegistration(map[string]string{ + "address": addr, + "token": token, + }, logger) + if err != nil { + t.Fatal(err) + } + + // Create the core + inm, err := inmem.NewInmemHA(nil, logger) + if err != nil { + t.Fatal(err) + } + inmha, err := inmem.NewInmemHA(nil, logger) + if err != nil { + t.Fatal(err) + } + const redirectAddr = "http://127.0.0.1:8200" + core, err := vault.NewCore(&vault.CoreConfig{ + ServiceRegistration: sd, + Physical: inm, + HAPhysical: inmha.(physical.HABackend), + RedirectAddr: redirectAddr, + DisableMlock: true, + }) + if err != nil { + t.Fatal(err) + } + + // Vault should not yet be registered with Consul + services, _, err := client.Catalog().Services(nil) + if err != nil { + t.Fatal(err) + } + if diff := deep.Equal(services, map[string][]string{ + "consul": []string{}, + }); diff != nil { + t.Fatal(diff) + } + + // Run service discovery on the core + wg := &sync.WaitGroup{} + var shutdown chan struct{} + activeFunc := func() bool { + if isLeader, _, _, err := core.Leader(); err == nil { + return isLeader + } + return false + } + err = sd.RunServiceRegistration( + wg, shutdown, redirectAddr, activeFunc, core.Sealed, core.PerfStandby) + if err != nil { + t.Fatal(err) + } + + // Vault should soon be registered with Consul in standby mode + services = transitionFrom(t, map[string][]string{ + "consul": []string{}, + }) + if diff := deep.Equal(services, map[string][]string{ + "consul": []string{}, + "vault": []string{"standby"}, + }); diff != nil { + t.Fatal(diff) + } + + // Initialize and unseal the core + keys, _ := vault.TestCoreInit(t, core) + for _, key := range keys { + if _, err := vault.TestCoreUnseal(core, vault.TestKeyCopy(key)); err != nil { + t.Fatalf("unseal err: %s", err) + } + } + if core.Sealed() { + t.Fatal("should not be sealed") + } + + // Wait for the core to become active + vault.TestWaitActive(t, core) + + // Vault should soon be registered with Consul in active mode + services = transitionFrom(t, map[string][]string{ + "consul": []string{}, + "vault": []string{"standby"}, + }) + if diff := deep.Equal(services, map[string][]string{ + "consul": []string{}, + "vault": []string{"active"}, + }); diff != nil { + t.Fatal(diff) + } +} + +func TestConsul_ServiceTags(t *testing.T) { + consulConfig := map[string]string{ + "path": "seaTech/", + "service": "astronomy", + "service_tags": "deadbeef, cafeefac, deadc0de, feedface", + "redirect_addr": "http://127.0.0.2:8200", + "check_timeout": "6s", + "address": "127.0.0.2", + "scheme": "https", + "token": "deadbeef-cafeefac-deadc0de-feedface", + "max_parallel": "4", + "disable_registration": "false", + } + logger := logging.NewVaultLogger(log.Debug) + + be, err := NewConsulServiceRegistration(consulConfig, logger) + if err != nil { + t.Fatal(err) + } + + c, ok := be.(*ConsulServiceRegistration) + if !ok { + t.Fatalf("failed to create physical Consul backend") + } + + expected := []string{"deadbeef", "cafeefac", "deadc0de", "feedface"} + actual := c.fetchServiceTags(false, false) + if !strutil.EquivalentSlices(actual, append(expected, "standby")) { + t.Fatalf("bad: expected:%s actual:%s", append(expected, "standby"), actual) + } + + actual = c.fetchServiceTags(true, false) + if !strutil.EquivalentSlices(actual, append(expected, "active")) { + t.Fatalf("bad: expected:%s actual:%s", append(expected, "active"), actual) + } + + actual = c.fetchServiceTags(false, true) + if !strutil.EquivalentSlices(actual, append(expected, "performance-standby")) { + t.Fatalf("bad: expected:%s actual:%s", append(expected, "performance-standby"), actual) + } + + actual = c.fetchServiceTags(true, true) + if !strutil.EquivalentSlices(actual, append(expected, "performance-standby")) { + t.Fatalf("bad: expected:%s actual:%s", append(expected, "performance-standby"), actual) + } +} + +func TestConsul_ServiceAddress(t *testing.T) { + tests := []struct { + consulConfig map[string]string + serviceAddrNil bool + }{ + { + consulConfig: map[string]string{ + "service_address": "", + }, + }, + { + consulConfig: map[string]string{ + "service_address": "vault.example.com", + }, + }, + { + serviceAddrNil: true, + }, + } + + for _, test := range tests { + logger := logging.NewVaultLogger(log.Debug) + + be, err := NewConsulServiceRegistration(test.consulConfig, logger) + if err != nil { + t.Fatalf("expected Consul to initialize: %v", err) + } + + c, ok := be.(*ConsulServiceRegistration) + if !ok { + t.Fatalf("Expected ConsulServiceRegistration") + } + + if test.serviceAddrNil { + if c.serviceAddress != nil { + t.Fatalf("expected service address to be nil") + } + } else { + if c.serviceAddress == nil { + t.Fatalf("did not expect service address to be nil") + } + } + } +} + +func TestConsul_newConsulServiceRegistration(t *testing.T) { + tests := []struct { + name string + consulConfig map[string]string + fail bool + redirectAddr string + checkTimeout time.Duration + path string + service string + address string + scheme string + token string + max_parallel int + disableReg bool + consistencyMode string + }{ + { + name: "Valid default config", + consulConfig: map[string]string{}, + checkTimeout: 5 * time.Second, + redirectAddr: "http://127.0.0.1:8200", + path: "vault/", + service: "vault", + address: "127.0.0.1:8500", + scheme: "http", + token: "", + max_parallel: 4, + disableReg: false, + consistencyMode: "default", + }, + { + name: "Valid modified config", + consulConfig: map[string]string{ + "path": "seaTech/", + "service": "astronomy", + "redirect_addr": "http://127.0.0.2:8200", + "check_timeout": "6s", + "address": "127.0.0.2", + "scheme": "https", + "token": "deadbeef-cafeefac-deadc0de-feedface", + "max_parallel": "4", + "disable_registration": "false", + "consistency_mode": "strong", + }, + checkTimeout: 6 * time.Second, + path: "seaTech/", + service: "astronomy", + redirectAddr: "http://127.0.0.2:8200", + address: "127.0.0.2", + scheme: "https", + token: "deadbeef-cafeefac-deadc0de-feedface", + max_parallel: 4, + consistencyMode: "strong", + }, + { + name: "Unix socket", + consulConfig: map[string]string{ + "address": "unix:///tmp/.consul.http.sock", + }, + address: "/tmp/.consul.http.sock", + scheme: "http", // Default, not overridden? + + // Defaults + checkTimeout: 5 * time.Second, + redirectAddr: "http://127.0.0.1:8200", + path: "vault/", + service: "vault", + token: "", + max_parallel: 4, + disableReg: false, + consistencyMode: "default", + }, + { + name: "Scheme in address", + consulConfig: map[string]string{ + "address": "https://127.0.0.2:5000", + }, + address: "127.0.0.2:5000", + scheme: "https", + + // Defaults + checkTimeout: 5 * time.Second, + redirectAddr: "http://127.0.0.1:8200", + path: "vault/", + service: "vault", + token: "", + max_parallel: 4, + disableReg: false, + consistencyMode: "default", + }, + { + name: "check timeout too short", + fail: true, + consulConfig: map[string]string{ + "check_timeout": "99ms", + }, + }, + } + + for _, test := range tests { + logger := logging.NewVaultLogger(log.Debug) + + be, err := NewConsulServiceRegistration(test.consulConfig, logger) + if test.fail { + if err == nil { + t.Fatalf(`Expected config "%s" to fail`, test.name) + } else { + continue + } + } else if !test.fail && err != nil { + t.Fatalf("Expected config %s to not fail: %v", test.name, err) + } + + c, ok := be.(*ConsulServiceRegistration) + if !ok { + t.Fatalf("Expected ConsulServiceRegistration: %s", test.name) + } + c.disableRegistration = true + + if c.disableRegistration == false { + addr := os.Getenv("CONSUL_HTTP_ADDR") + if addr == "" { + continue + } + } + + var shutdownCh sr.ShutdownChannel + waitGroup := &sync.WaitGroup{} + if err := c.RunServiceRegistration(waitGroup, shutdownCh, test.redirectAddr, testActiveFunc(0.5), testSealedFunc(0.5), testPerformanceStandbyFunc(0.5)); err != nil { + t.Fatalf("bad: %v", err) + } + + if test.checkTimeout != c.checkTimeout { + t.Errorf("bad: %v != %v", test.checkTimeout, c.checkTimeout) + } + + if test.service != c.serviceName { + t.Errorf("bad: %v != %v", test.service, c.serviceName) + } + + // The configuration stored in the Consul "client" object is not exported, so + // we either have to skip validating it, or add a method to export it, or use reflection. + consulConfig := reflect.Indirect(reflect.ValueOf(c.Client)).FieldByName("config") + consulConfigScheme := consulConfig.FieldByName("Scheme").String() + consulConfigAddress := consulConfig.FieldByName("Address").String() + + if test.scheme != consulConfigScheme { + t.Errorf("bad scheme value: %v != %v", test.scheme, consulConfigScheme) + } + + if test.address != consulConfigAddress { + t.Errorf("bad address value: %v != %v", test.address, consulConfigAddress) + } + + // FIXME(sean@): Unable to test max_parallel + // if test.max_parallel != cap(c.permitPool) { + // t.Errorf("bad: %v != %v", test.max_parallel, cap(c.permitPool)) + // } + } +} + +func TestConsul_serviceTags(t *testing.T) { + tests := []struct { + active bool + perfStandby bool + tags []string + }{ + { + active: true, + perfStandby: false, + tags: []string{"active"}, + }, + { + active: false, + perfStandby: false, + tags: []string{"standby"}, + }, + { + active: false, + perfStandby: true, + tags: []string{"performance-standby"}, + }, + { + active: true, + perfStandby: true, + tags: []string{"performance-standby"}, + }, + } + + c := testConsulServiceRegistration(t) + + for _, test := range tests { + tags := c.fetchServiceTags(test.active, test.perfStandby) + if !reflect.DeepEqual(tags[:], test.tags[:]) { + t.Errorf("Bad %v: %v %v", test.active, tags, test.tags) + } + } +} + +func TestConsul_setRedirectAddr(t *testing.T) { + tests := []struct { + addr string + host string + port int64 + pass bool + }{ + { + addr: "http://127.0.0.1:8200/", + host: "127.0.0.1", + port: 8200, + pass: true, + }, + { + addr: "http://127.0.0.1:8200", + host: "127.0.0.1", + port: 8200, + pass: true, + }, + { + addr: "https://127.0.0.1:8200", + host: "127.0.0.1", + port: 8200, + pass: true, + }, + { + addr: "unix:///tmp/.vault.addr.sock", + host: "/tmp/.vault.addr.sock", + port: -1, + pass: true, + }, + { + addr: "127.0.0.1:8200", + pass: false, + }, + { + addr: "127.0.0.1", + pass: false, + }, + } + for _, test := range tests { + c := testConsulServiceRegistration(t) + err := c.setRedirectAddr(test.addr) + if test.pass { + if err != nil { + t.Fatalf("bad: %v", err) + } + } else { + if err == nil { + t.Fatalf("bad, expected fail") + } else { + continue + } + } + + if c.redirectHost != test.host { + t.Fatalf("bad: %v != %v", c.redirectHost, test.host) + } + + if c.redirectPort != test.port { + t.Fatalf("bad: %v != %v", c.redirectPort, test.port) + } + } +} + +func TestConsul_serviceID(t *testing.T) { + tests := []struct { + name string + redirectAddr string + serviceName string + expected string + valid bool + }{ + { + name: "valid host w/o slash", + redirectAddr: "http://127.0.0.1:8200", + serviceName: "sea-tech-astronomy", + expected: "sea-tech-astronomy:127.0.0.1:8200", + valid: true, + }, + { + name: "valid host w/ slash", + redirectAddr: "http://127.0.0.1:8200/", + serviceName: "sea-tech-astronomy", + expected: "sea-tech-astronomy:127.0.0.1:8200", + valid: true, + }, + { + name: "valid https host w/ slash", + redirectAddr: "https://127.0.0.1:8200/", + serviceName: "sea-tech-astronomy", + expected: "sea-tech-astronomy:127.0.0.1:8200", + valid: true, + }, + { + name: "invalid host name", + redirectAddr: "https://127.0.0.1:8200/", + serviceName: "sea_tech_astronomy", + expected: "", + valid: false, + }, + } + + logger := logging.NewVaultLogger(log.Debug) + + for _, test := range tests { + be, err := NewConsulServiceRegistration(consulConf{ + "service": test.serviceName, + }, logger) + if !test.valid { + if err == nil { + t.Fatalf("expected an error initializing for name %q", test.serviceName) + } + continue + } + if test.valid && err != nil { + t.Fatalf("expected Consul to initialize: %v", err) + } + + c, ok := be.(*ConsulServiceRegistration) + if !ok { + t.Fatalf("Expected ConsulServiceRegistration") + } + + if err := c.setRedirectAddr(test.redirectAddr); err != nil { + t.Fatalf("bad: %s %v", test.name, err) + } + + serviceID := c.serviceID() + if serviceID != test.expected { + t.Fatalf("bad: %v != %v", serviceID, test.expected) + } + } +} diff --git a/serviceregistration/service_registration.go b/serviceregistration/service_registration.go new file mode 100644 index 000000000000..8f23c9ccdddf --- /dev/null +++ b/serviceregistration/service_registration.go @@ -0,0 +1,40 @@ +package serviceregistration + +import ( + "sync" + + log "github.com/hashicorp/go-hclog" +) + +// Factory is the factory function to create a ServiceRegistration. +type Factory func(config map[string]string, logger log.Logger) (ServiceRegistration, error) + +// ServiceRegistration is an interface that advertises the state of Vault to a +// service discovery network. +type ServiceRegistration interface { + // NotifyActiveStateChange is used by Core to notify that this Vault + // instance has changed its status to active or standby. + NotifyActiveStateChange() error + + // NotifySealedStateChange is used by Core to notify that Vault has changed + // its Sealed status to sealed or unsealed. + NotifySealedStateChange() error + + // NotifyPerformanceStandbyStateChange is used by Core to notify that this + // Vault instance has changed it status to performance standby or standby. + NotifyPerformanceStandbyStateChange() error + + // Run executes any background service discovery tasks until the + // shutdown channel is closed. + RunServiceRegistration( + waitGroup *sync.WaitGroup, shutdownCh ShutdownChannel, redirectAddr string, + activeFunc ActiveFunction, sealedFunc SealedFunction, perfStandbyFunc PerformanceStandbyFunction) error +} + +// Callback signatures for RunServiceRegistration +type ActiveFunction func() bool +type SealedFunction func() bool +type PerformanceStandbyFunction func() bool + +// ShutdownChannel is the shutdown signal for RunServiceRegistration +type ShutdownChannel chan struct{} diff --git a/vault/core.go b/vault/core.go index fc0fdb91136d..261296270498 100644 --- a/vault/core.go +++ b/vault/core.go @@ -39,6 +39,7 @@ import ( "github.com/hashicorp/vault/sdk/helper/tlsutil" "github.com/hashicorp/vault/sdk/logical" "github.com/hashicorp/vault/sdk/physical" + sr "github.com/hashicorp/vault/serviceregistration" "github.com/hashicorp/vault/shamir" "github.com/hashicorp/vault/vault/cluster" "github.com/hashicorp/vault/vault/seal" @@ -192,6 +193,9 @@ type Core struct { // physical backend is the un-trusted backend with durable data physical physical.Backend + // serviceRegistration is the ServiceRegistration network + serviceRegistration sr.ServiceRegistration + // underlyingPhysical will always point to the underlying backend // implementation. This is an un-trusted backend with durable data underlyingPhysical physical.Backend @@ -482,60 +486,62 @@ type Core struct { // CoreConfig is used to parameterize a core type CoreConfig struct { - DevToken string `json:"dev_token" structs:"dev_token" mapstructure:"dev_token"` + DevToken string - BuiltinRegistry BuiltinRegistry `json:"builtin_registry" structs:"builtin_registry" mapstructure:"builtin_registry"` + BuiltinRegistry BuiltinRegistry - LogicalBackends map[string]logical.Factory `json:"logical_backends" structs:"logical_backends" mapstructure:"logical_backends"` + LogicalBackends map[string]logical.Factory - CredentialBackends map[string]logical.Factory `json:"credential_backends" structs:"credential_backends" mapstructure:"credential_backends"` + CredentialBackends map[string]logical.Factory - AuditBackends map[string]audit.Factory `json:"audit_backends" structs:"audit_backends" mapstructure:"audit_backends"` + AuditBackends map[string]audit.Factory - Physical physical.Backend `json:"physical" structs:"physical" mapstructure:"physical"` + Physical physical.Backend - StorageType string `json:"storage_type" structs:"storage_type" mapstructure:"storage_type"` + StorageType string // May be nil, which disables HA operations - HAPhysical physical.HABackend `json:"ha_physical" structs:"ha_physical" mapstructure:"ha_physical"` + HAPhysical physical.HABackend + + ServiceRegistration sr.ServiceRegistration - Seal Seal `json:"seal" structs:"seal" mapstructure:"seal"` + Seal Seal - SecureRandomReader io.Reader `json:"secure_random_reader" structs:"secure_random_reader" mapstructure:"secure_random_reader"` + SecureRandomReader io.Reader - Logger log.Logger `json:"logger" structs:"logger" mapstructure:"logger"` + Logger log.Logger // Disables the LRU cache on the physical backend - DisableCache bool `json:"disable_cache" structs:"disable_cache" mapstructure:"disable_cache"` + DisableCache bool // Disables mlock syscall - DisableMlock bool `json:"disable_mlock" structs:"disable_mlock" mapstructure:"disable_mlock"` + DisableMlock bool // Custom cache size for the LRU cache on the physical backend, or zero for default - CacheSize int `json:"cache_size" structs:"cache_size" mapstructure:"cache_size"` + CacheSize int // Set as the leader address for HA - RedirectAddr string `json:"redirect_addr" structs:"redirect_addr" mapstructure:"redirect_addr"` + RedirectAddr string // Set as the cluster address for HA - ClusterAddr string `json:"cluster_addr" structs:"cluster_addr" mapstructure:"cluster_addr"` + ClusterAddr string - DefaultLeaseTTL time.Duration `json:"default_lease_ttl" structs:"default_lease_ttl" mapstructure:"default_lease_ttl"` + DefaultLeaseTTL time.Duration - MaxLeaseTTL time.Duration `json:"max_lease_ttl" structs:"max_lease_ttl" mapstructure:"max_lease_ttl"` + MaxLeaseTTL time.Duration - ClusterName string `json:"cluster_name" structs:"cluster_name" mapstructure:"cluster_name"` + ClusterName string - ClusterCipherSuites string `json:"cluster_cipher_suites" structs:"cluster_cipher_suites" mapstructure:"cluster_cipher_suites"` + ClusterCipherSuites string - EnableUI bool `json:"ui" structs:"ui" mapstructure:"ui"` + EnableUI bool // Enable the raw endpoint - EnableRaw bool `json:"enable_raw" structs:"enable_raw" mapstructure:"enable_raw"` + EnableRaw bool - PluginDirectory string `json:"plugin_directory" structs:"plugin_directory" mapstructure:"plugin_directory"` + PluginDirectory string - DisableSealWrap bool `json:"disable_sealwrap" structs:"disable_sealwrap" mapstructure:"disable_sealwrap"` + DisableSealWrap bool RawConfig *server.Config @@ -569,6 +575,7 @@ func (c *CoreConfig) Clone() *CoreConfig { AuditBackends: c.AuditBackends, Physical: c.Physical, HAPhysical: c.HAPhysical, + ServiceRegistration: c.ServiceRegistration, Seal: c.Seal, Logger: c.Logger, DisableCache: c.DisableCache, @@ -596,6 +603,26 @@ func (c *CoreConfig) Clone() *CoreConfig { } } +// GetServiceRegistration returns the config's ServiceRegistration, or nil if it does +// not exist. +func (c *CoreConfig) GetServiceRegistration() sr.ServiceRegistration { + + // Check whether there is a ServiceRegistration explictly configured + if c.ServiceRegistration != nil { + return c.ServiceRegistration + } + + // Check if HAPhysical is configured and implements ServiceRegistration + if c.HAPhysical != nil && c.HAPhysical.HAEnabled() { + if disc, ok := c.HAPhysical.(sr.ServiceRegistration); ok { + return disc + } + } + + // No service discovery is available. + return nil +} + // NewCore is used to construct a new core func NewCore(conf *CoreConfig) (*Core, error) { if conf.HAPhysical != nil && conf.HAPhysical.HAEnabled() { @@ -651,6 +678,7 @@ func NewCore(conf *CoreConfig) (*Core, error) { entCore: entCore{}, devToken: conf.DevToken, physical: conf.Physical, + serviceRegistration: conf.GetServiceRegistration(), underlyingPhysical: conf.Physical, storageType: conf.StorageType, redirectAddr: conf.RedirectAddr, @@ -1347,13 +1375,10 @@ func (c *Core) unsealInternal(ctx context.Context, masterKey []byte) (bool, erro c.logger.Info("vault is unsealed") } - if c.ha != nil { - sd, ok := c.ha.(physical.ServiceDiscovery) - if ok { - if err := sd.NotifySealedStateChange(); err != nil { - if c.logger.IsWarn() { - c.logger.Warn("failed to notify unsealed status", "error", err) - } + if c.serviceRegistration != nil { + if err := c.serviceRegistration.NotifySealedStateChange(); err != nil { + if c.logger.IsWarn() { + c.logger.Warn("failed to notify unsealed status", "error", err) } } } @@ -1651,13 +1676,10 @@ func (c *Core) sealInternalWithOptions(grabStateLock, keepHALock, shutdownRaft b return err } - if c.ha != nil { - sd, ok := c.ha.(physical.ServiceDiscovery) - if ok { - if err := sd.NotifySealedStateChange(); err != nil { - if c.logger.IsWarn() { - c.logger.Warn("failed to notify sealed status", "error", err) - } + if c.serviceRegistration != nil { + if err := c.serviceRegistration.NotifySealedStateChange(); err != nil { + if c.logger.IsWarn() { + c.logger.Warn("failed to notify sealed status", "error", err) } } } diff --git a/vault/core_test.go b/vault/core_test.go index 1f742af34e77..6c05200dbe4d 100644 --- a/vault/core_test.go +++ b/vault/core_test.go @@ -3,6 +3,7 @@ package vault import ( "context" "reflect" + "sync" "testing" "time" @@ -17,6 +18,7 @@ import ( "github.com/hashicorp/vault/sdk/logical" "github.com/hashicorp/vault/sdk/physical" "github.com/hashicorp/vault/sdk/physical/inmem" + sr "github.com/hashicorp/vault/serviceregistration" ) var ( @@ -2446,3 +2448,114 @@ func TestCore_HandleRequest_TokenCreate_RegisterAuthFailure(t *testing.T) { t.Fatal(err) } } + +// mockServiceRegistration helps test whether standalone ServiceRegistration works +type mockServiceRegistration struct { + notifyActiveCount int + notifySealedCount int + notifyPerfCount int + runDiscoveryCount int +} + +func (m *mockServiceRegistration) NotifyActiveStateChange() error { + m.notifyActiveCount++ + return nil +} + +func (m *mockServiceRegistration) NotifySealedStateChange() error { + m.notifySealedCount++ + return nil +} + +func (m *mockServiceRegistration) NotifyPerformanceStandbyStateChange() error { + m.notifyPerfCount++ + return nil +} + +func (m *mockServiceRegistration) RunServiceRegistration( + _ *sync.WaitGroup, _ sr.ShutdownChannel, _ string, + _ sr.ActiveFunction, _ sr.SealedFunction, + _ sr.PerformanceStandbyFunction) error { + + m.runDiscoveryCount++ + return nil +} + +// TestCore_ServiceRegistration tests whether standalone ServiceRegistration works +func TestCore_ServiceRegistration(t *testing.T) { + + // Make a mock service discovery + sr := &mockServiceRegistration{} + + // Create the core + logger = logging.NewVaultLogger(log.Trace) + inm, err := inmem.NewInmemHA(nil, logger) + if err != nil { + t.Fatal(err) + } + inmha, err := inmem.NewInmemHA(nil, logger) + if err != nil { + t.Fatal(err) + } + const redirectAddr = "http://127.0.0.1:8200" + core, err := NewCore(&CoreConfig{ + ServiceRegistration: sr, + Physical: inm, + HAPhysical: inmha.(physical.HABackend), + RedirectAddr: redirectAddr, + DisableMlock: true, + }) + if err != nil { + t.Fatal(err) + } + + // Vault should not yet be registered + if diff := deep.Equal(sr, &mockServiceRegistration{}); diff != nil { + t.Fatal(diff) + } + + // Run service discovery on the core + wg := &sync.WaitGroup{} + var shutdown chan struct{} + activeFunc := func() bool { + if isLeader, _, _, err := core.Leader(); err == nil { + return isLeader + } + return false + } + err = sr.RunServiceRegistration( + wg, shutdown, redirectAddr, activeFunc, core.Sealed, core.PerfStandby) + if err != nil { + t.Fatal(err) + } + + // Vault should be registered + if diff := deep.Equal(sr, &mockServiceRegistration{ + runDiscoveryCount: 1, + }); diff != nil { + t.Fatal(diff) + } + + // Initialize and unseal the core + keys, _ := TestCoreInit(t, core) + for _, key := range keys { + if _, err := TestCoreUnseal(core, TestKeyCopy(key)); err != nil { + t.Fatalf("unseal err: %s", err) + } + } + if core.Sealed() { + t.Fatal("should not be sealed") + } + + // Wait for core to become active + TestWaitActive(t, core) + + // Vault should be registered, unsealed, and active + if diff := deep.Equal(sr, &mockServiceRegistration{ + runDiscoveryCount: 1, + notifyActiveCount: 1, + notifySealedCount: 1, + }); diff != nil { + t.Fatal(diff) + } +} diff --git a/vault/ha.go b/vault/ha.go index 3e89b1c2c96b..08157a6a4611 100644 --- a/vault/ha.go +++ b/vault/ha.go @@ -924,9 +924,8 @@ func (c *Core) advertiseLeader(ctx context.Context, uuid string, leaderLostCh <- return err } - sd, ok := c.ha.(physical.ServiceDiscovery) - if ok { - if err := sd.NotifyActiveStateChange(); err != nil { + if c.serviceRegistration != nil { + if err := c.serviceRegistration.NotifyActiveStateChange(); err != nil { if c.logger.IsWarn() { c.logger.Warn("failed to notify active status", "error", err) } @@ -960,9 +959,8 @@ func (c *Core) clearLeader(uuid string) error { err := c.barrier.Delete(context.Background(), key) // Advertise ourselves as a standby - sd, ok := c.ha.(physical.ServiceDiscovery) - if ok { - if err := sd.NotifyActiveStateChange(); err != nil { + if c.serviceRegistration != nil { + if err := c.serviceRegistration.NotifyActiveStateChange(); err != nil { if c.logger.IsWarn() { c.logger.Warn("failed to notify standby status", "error", err) } diff --git a/website/source/docs/configuration/service-registration/consul.html.md b/website/source/docs/configuration/service-registration/consul.html.md new file mode 100644 index 000000000000..c724444fa1ca --- /dev/null +++ b/website/source/docs/configuration/service-registration/consul.html.md @@ -0,0 +1,231 @@ +--- +layout: "docs" +page_title: "Consul - Service Registration - Configuration" +sidebar_title: "Consul" +sidebar_current: "docs-configuration-storage-consul" +description: |- + Consul Service Registration registers Vault as a service in Consul with a default + health check. +--- + +# Consul Service Registration + +Consul Service Registration registers Vault as a service in [Consul][consul] with +a default health check. + +- **HashiCorp Supported** – Consul Service Registration is officially supported +by HashiCorp. + +```hcl +service_registration "consul" { + address = "127.0.0.1:8500" +} +``` + +Once properly configured, an unsealed Vault installation should be available and +accessible at: + +```text +active.vault.service.consul +``` + +Unsealed Vault instances in standby mode are available at: + +```text +standby.vault.service.consul +``` + +All unsealed Vault instances are available as healthy at: + +```text +vault.service.consul +``` + +Sealed Vault instances will mark themselves as unhealthy to avoid being returned +at Consul's service discovery layer. + +## `consul` Parameters + +- `address` `(string: "127.0.0.1:8500")` – Specifies the address of the Consul + agent to communicate with. This can be an IP address, DNS record, or unix + socket. It is recommended that you communicate with a local Consul agent; do + not communicate directly with a server. + +- `check_timeout` `(string: "5s")` – Specifies the check interval used to send + health check information back to Consul. This is specified using a label + suffix like `"30s"` or `"1h"`. + +- `disable_registration` `(string: "false")` – Specifies whether Vault should + register itself with Consul. + +- `scheme` `(string: "http")` – Specifies the scheme to use when communicating + with Consul. This can be set to "http" or "https". It is highly recommended + you communicate with Consul over https over non-local connections. When + communicating over a unix socket, this option is ignored. + +- `service` `(string: "vault")` – Specifies the name of the service to register + in Consul. + +- `service_tags` `(string: "")` – Specifies a comma-separated list of tags to + attach to the service registration in Consul. + +- `service_address` `(string: nil)` – Specifies a service-specific address to + set on the service registration in Consul. If unset, Vault will use what it + knows to be the HA redirect address - which is usually desirable. Setting + this parameter to `""` will tell Consul to leverage the configuration of the + node the service is registered on dynamically. This could be beneficial if + you intend to leverage Consul's + [`translate_wan_addrs`][consul-translate-wan-addrs] parameter. + +- `token` `(string: "")` – Specifies the [Consul ACL token][consul-acl] with + permission to read and write from the `path` in Consul's key-value store. + This is **not** a Vault token. See the ACL section below for help. + +The following settings apply when communicating with Consul via an encrypted +connection. You can read more about encrypting Consul connections on the +[Consul encryption page][consul-encryption]. + +- `tls_ca_file` `(string: "")` – Specifies the path to the CA certificate used + for Consul communication. This defaults to system bundle if not specified. + This should be set according to the + [`ca_file`](https://www.consul.io/docs/agent/options.html#ca_file) setting in + Consul. + +- `tls_cert_file` `(string: "")` (optional) – Specifies the path to the + certificate for Consul communication. This should be set according to the + [`cert_file`](https://www.consul.io/docs/agent/options.html#cert_file) setting + in Consul. + +- `tls_key_file` `(string: "")` – Specifies the path to the private key for + Consul communication. This should be set according to the + [`key_file`](https://www.consul.io/docs/agent/options.html#key_file) setting + in Consul. + +- `tls_min_version` `(string: "tls12")` – Specifies the minimum TLS version to + use. Accepted values are `"tls10"`, `"tls11"` or `"tls12"`. + +- `tls_skip_verify` `(string: "false")` – Disable verification of TLS certificates. + Using this option is highly discouraged. + +## ACLs + +If using ACLs in Consul, you'll need appropriate permissions. For Consul 0.8, +the following will work for most use-cases, assuming that your service name is +`vault` and the prefix being used is `vault/`: + +```json +{ + "key": { + "vault/": { + "policy": "write" + } + }, + "node": { + "": { + "policy": "write" + } + }, + "service": { + "vault": { + "policy": "write" + } + }, + "agent": { + "": { + "policy": "write" + } + + }, + "session": { + "": { + "policy": "write" + } + } +} +``` + +For Consul 1.4+, the following example takes into account the changed ACL +language: + +```json +{ + "key_prefix": { + "vault/": { + "policy": "write" + } + }, + "node_prefix": { + "": { + "policy": "write" + } + }, + "service": { + "vault": { + "policy": "write" + } + }, + "agent_prefix": { + "": { + "policy": "write" + } + + }, + "session_prefix": { + "": { + "policy": "write" + } + } +} +``` + +## `consul` Examples + +### Local Agent + +This example shows a sample configuration which communicates with a local +Consul agent running on `127.0.0.1:8500`. + +```hcl +service_registration "consul" {} +``` + +### Detailed Customization + +This example shows communicating with Consul on a custom address with an ACL +token. + +```hcl +service_registration "consul" { + address = "10.5.7.92:8194" + token = "abcd1234" +} +``` + +### Consul via Unix Socket + +This example shows communicating with Consul over a local unix socket. + +```hcl +service_registration "consul" { + address = "unix:///tmp/.consul.http.sock" +} +``` + +### Custom TLS + +This example shows using a custom CA, certificate, and key file to securely +communicate with Consul over TLS. + +```hcl +service_registration "consul" { + scheme = "https" + tls_ca_file = "/etc/pem/vault.ca" + tls_cert_file = "/etc/pem/vault.cert" + tls_key_file = "/etc/pem/vault.key" +} +``` + +[consul]: https://www.consul.io/ "Consul by HashiCorp" +[consul-acl]: https://www.consul.io/docs/guides/acl.html "Consul ACLs" +[consul-encryption]: https://www.consul.io/docs/agent/encryption.html "Consul Encryption" +[consul-translate-wan-addrs]: https://www.consul.io/docs/agent/options.html#translate_wan_addrs "Consul Configuration" diff --git a/website/source/docs/configuration/service-registration/index.html.md b/website/source/docs/configuration/service-registration/index.html.md new file mode 100644 index 000000000000..484c378396a2 --- /dev/null +++ b/website/source/docs/configuration/service-registration/index.html.md @@ -0,0 +1,67 @@ +--- +layout: "docs" +page_title: "Service Registration - Configuration" +sidebar_title: "service_registration" +sidebar_current: "docs-configuration-serviceDiscovery" +description: |- + The optional `service_registration` stanza configures Vault's mechanism for + service registration. +--- + +# `service_registration` Stanza + +The optional `service_registration` stanza configures Vault's mechanism for +service registration. The `service_registration` stanza is designed for use cases +where you would like to use a system like [Consul][consul] for [service +discovery][consul-discovery], but use a different system for the [storage +backend][storage-backend]. + +When Consul is configured as the [storage backend][consul-backend], Vault +implicitly uses Consul for service registration, so the `service_registration` stanza +is not needed. + +For times when you would like to use a different storage backend, like +[Raft][raft-backend], but still have service registration available, the +`service_registration` stanza can be used: + +```hcl +service_registration "consul" { + address = "127.0.0.1:8500" +} +storage "raft" { + path = "/path/to/raft/data" + node_id = "raft_node_1" +} +``` + +For information about a specific service registration provider, choose one from +the navigation on the left. + +## Configuration + +Service registration configuration is done through the Vault configuration file +using the `service_registration` stanza: + +```hcl +service_registration [NAME] { + [PARAMETERS...] +} +``` + +For example: + +```hcl +service_registration "consul" { + address = "127.0.0.1:8500" +} +``` + +For configuration options which also read an environment variable, the +environment variable will take precedence over values in the configuration +file. + +[consul]: https://www.consul.io/ +[consul-discovery]: https://www.consul.io/discovery.html +[storage-backend]: /docs/configuration/storage/index.html +[consul-backend]: /docs/configuration/storage/consul.html +[raft-backend]: /docs/configuration/storage/raft.html diff --git a/website/source/layouts/docs.erb b/website/source/layouts/docs.erb index 4e33a4141c7c..bc06779ba106 100644 --- a/website/source/layouts/docs.erb +++ b/website/source/layouts/docs.erb @@ -78,6 +78,11 @@ 'swift', 'zookeeper' ] + }, { + category: 'service-registration', + content: [ + 'consul', + ] }, 'telemetry', { category: 'ui' },