Skip to content

Commit

Permalink
Merge pull request #8352 from hashicorp/backport/8311
Browse files Browse the repository at this point in the history
  • Loading branch information
mkeeler authored Jul 21, 2020
2 parents 8513ad5 + a8d2e5a commit 49145df
Show file tree
Hide file tree
Showing 15 changed files with 1,441 additions and 235 deletions.
198 changes: 32 additions & 166 deletions agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
autoconf "github.com/hashicorp/consul/agent/auto-config"
"github.com/hashicorp/consul/agent/cache"
cachetype "github.com/hashicorp/consul/agent/cache-types"
certmon "github.com/hashicorp/consul/agent/cert-monitor"
"github.com/hashicorp/consul/agent/checks"
"github.com/hashicorp/consul/agent/config"
"github.com/hashicorp/consul/agent/consul"
Expand Down Expand Up @@ -166,6 +167,8 @@ type notifier interface {
type Agent struct {
autoConf *autoconf.AutoConfig

certMonitor *certmon.CertMonitor

// config is the agent configuration.
config *config.RuntimeConfig

Expand Down Expand Up @@ -717,17 +720,39 @@ func (a *Agent) Start(ctx context.Context) error {
a.registerCache()

if a.config.AutoEncryptTLS && !a.config.ServerMode {
reply, err := a.setupClientAutoEncrypt(ctx)
reply, err := a.autoEncryptInitialCertificate(ctx)
if err != nil {
return fmt.Errorf("AutoEncrypt failed: %s", err)
}
rootsReq, leafReq, err := a.setupClientAutoEncryptCache(reply)

cmConfig := new(certmon.Config).
WithCache(a.cache).
WithLogger(a.logger.Named(logging.AutoEncrypt)).
WithTLSConfigurator(a.tlsConfigurator).
WithTokens(a.tokens).
WithFallback(a.autoEncryptInitialCertificate).
WithDNSSANs(a.config.AutoEncryptDNSSAN).
WithIPSANs(a.config.AutoEncryptIPSAN).
WithDatacenter(a.config.Datacenter).
WithNodeName(a.config.NodeName)

monitor, err := certmon.New(cmConfig)
if err != nil {
return fmt.Errorf("AutoEncrypt failed: %s", err)
return fmt.Errorf("AutoEncrypt failed to setup certificate monitor: %w", err)
}
if err = a.setupClientAutoEncryptWatching(rootsReq, leafReq); err != nil {
return fmt.Errorf("AutoEncrypt failed: %s", err)
if err := monitor.Update(reply); err != nil {
return fmt.Errorf("AutoEncrypt failed to setup certificate monitor: %w", err)
}
a.certMonitor = monitor

// we don't need to worry about ever calling Stop as we have tied the go routines
// to the agents lifetime by using the StopCh. Also the agent itself doesn't have
// a need of ensuring that the go routine was stopped before performing any action
// so we can ignore the chan in the return.
if _, err := a.certMonitor.Start(&lib.StopChannelContext{StopCh: a.shutdownCh}); err != nil {
return fmt.Errorf("AutoEncrypt failed to start certificate monitor: %w", err)
}

a.logger.Info("automatically upgraded to TLS")
}

Expand Down Expand Up @@ -830,7 +855,7 @@ func (a *Agent) Start(ctx context.Context) error {
return nil
}

func (a *Agent) setupClientAutoEncrypt(ctx context.Context) (*structs.SignedResponse, error) {
func (a *Agent) autoEncryptInitialCertificate(ctx context.Context) (*structs.SignedResponse, error) {
client := a.delegate.(*consul.Client)

addrs := a.config.StartJoinAddrsLAN
Expand All @@ -840,166 +865,7 @@ func (a *Agent) setupClientAutoEncrypt(ctx context.Context) (*structs.SignedResp
}
addrs = append(addrs, retryJoinAddrs(disco, retryJoinSerfVariant, "LAN", a.config.RetryJoinLAN, a.logger)...)

reply, priv, err := client.RequestAutoEncryptCerts(ctx, addrs, a.config.ServerPort, a.tokens.AgentToken(), a.config.AutoEncryptDNSSAN, a.config.AutoEncryptIPSAN)
if err != nil {
return nil, err
}

connectCAPems := []string{}
for _, ca := range reply.ConnectCARoots.Roots {
connectCAPems = append(connectCAPems, ca.RootCert)
}
if err := a.tlsConfigurator.UpdateAutoEncrypt(reply.ManualCARoots, connectCAPems, reply.IssuedCert.CertPEM, priv, reply.VerifyServerHostname); err != nil {
return nil, err
}
return reply, nil

}

func (a *Agent) setupClientAutoEncryptCache(reply *structs.SignedResponse) (*structs.DCSpecificRequest, *cachetype.ConnectCALeafRequest, error) {
rootsReq := &structs.DCSpecificRequest{
Datacenter: a.config.Datacenter,
QueryOptions: structs.QueryOptions{Token: a.tokens.AgentToken()},
}

// prepolutate roots cache
rootRes := cache.FetchResult{Value: &reply.ConnectCARoots, Index: reply.ConnectCARoots.QueryMeta.Index}
if err := a.cache.Prepopulate(cachetype.ConnectCARootName, rootRes, a.config.Datacenter, a.tokens.AgentToken(), rootsReq.CacheInfo().Key); err != nil {
return nil, nil, err
}

leafReq := &cachetype.ConnectCALeafRequest{
Datacenter: a.config.Datacenter,
Token: a.tokens.AgentToken(),
Agent: a.config.NodeName,
DNSSAN: a.config.AutoEncryptDNSSAN,
IPSAN: a.config.AutoEncryptIPSAN,
}

// prepolutate leaf cache
certRes := cache.FetchResult{
Value: &reply.IssuedCert,
Index: reply.ConnectCARoots.QueryMeta.Index,
}

for _, ca := range reply.ConnectCARoots.Roots {
if ca.ID == reply.ConnectCARoots.ActiveRootID {
certRes.State = cachetype.ConnectCALeafSuccess(ca.SigningKeyID)
break
}
}
if err := a.cache.Prepopulate(cachetype.ConnectCALeafName, certRes, a.config.Datacenter, a.tokens.AgentToken(), leafReq.Key()); err != nil {
return nil, nil, err
}
return rootsReq, leafReq, nil
}

func (a *Agent) setupClientAutoEncryptWatching(rootsReq *structs.DCSpecificRequest, leafReq *cachetype.ConnectCALeafRequest) error {
// setup watches
ch := make(chan cache.UpdateEvent, 10)
ctx, cancel := context.WithCancel(context.Background())

// Watch for root changes
err := a.cache.Notify(ctx, cachetype.ConnectCARootName, rootsReq, rootsWatchID, ch)
if err != nil {
cancel()
return err
}

// Watch the leaf cert
err = a.cache.Notify(ctx, cachetype.ConnectCALeafName, leafReq, leafWatchID, ch)
if err != nil {
cancel()
return err
}

// Setup actions in case the watches are firing.
go func() {
for {
select {
case <-a.shutdownCh:
cancel()
return
case <-ctx.Done():
return
case u := <-ch:
switch u.CorrelationID {
case rootsWatchID:
roots, ok := u.Result.(*structs.IndexedCARoots)
if !ok {
err := fmt.Errorf("invalid type for roots response: %T", u.Result)
a.logger.Error("watch error for correlation id",
"correlation_id", u.CorrelationID,
"error", err,
)
continue
}
pems := []string{}
for _, root := range roots.Roots {
pems = append(pems, root.RootCert)
}
a.tlsConfigurator.UpdateAutoEncryptCA(pems)
case leafWatchID:
leaf, ok := u.Result.(*structs.IssuedCert)
if !ok {
err := fmt.Errorf("invalid type for leaf response: %T", u.Result)
a.logger.Error("watch error for correlation id",
"correlation_id", u.CorrelationID,
"error", err,
)
continue
}
a.tlsConfigurator.UpdateAutoEncryptCert(leaf.CertPEM, leaf.PrivateKeyPEM)
}
}
}
}()

// Setup safety net in case the auto_encrypt cert doesn't get renewed
// in time. The agent would be stuck in that case because the watches
// never use the AutoEncrypt.Sign endpoint.
go func() {
for {

// Check 10sec after cert expires. The agent cache
// should be handling the expiration and renew before
// it.
// If there is no cert, AutoEncryptCertNotAfter returns
// a value in the past which immediately triggers the
// renew, but this case shouldn't happen because at
// this point, auto_encrypt was just being setup
// successfully.
autoLogger := a.logger.Named(logging.AutoEncrypt)
interval := a.tlsConfigurator.AutoEncryptCertNotAfter().Sub(time.Now().Add(10 * time.Second))
a.logger.Debug("setting up client certificate expiration check on interval", "interval", interval)
select {
case <-a.shutdownCh:
return
case <-time.After(interval):
// check auto encrypt client cert expiration
if a.tlsConfigurator.AutoEncryptCertExpired() {
autoLogger.Debug("client certificate expired.")
// Background because the context is mainly useful when the agent is first starting up.
reply, err := a.setupClientAutoEncrypt(context.Background())
if err != nil {
autoLogger.Error("client certificate expired, failed to renew", "error", err)
// in case of an error, try again in one minute
interval = time.Minute
continue
}
_, _, err = a.setupClientAutoEncryptCache(reply)
if err != nil {
autoLogger.Error("client certificate expired, failed to populate cache", "error", err)
// in case of an error, try again in one minute
interval = time.Minute
continue
}
}
}
}
}()

return nil
return client.RequestAutoEncryptCerts(ctx, addrs, a.config.ServerPort, a.tokens.AgentToken(), a.config.AutoEncryptDNSSAN, a.config.AutoEncryptIPSAN)
}

func (a *Agent) listenAndServeGRPC() error {
Expand Down
42 changes: 15 additions & 27 deletions agent/agent_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5209,6 +5209,8 @@ func TestAgentConnectCALeafCert_good(t *testing.T) {
assert.Equal(fmt.Sprintf("%d", issued.ModifyIndex),
resp.Header().Get("X-Consul-Index"))

index := resp.Header().Get("X-Consul-Index")

// Test caching
{
// Fetch it again
Expand All @@ -5221,39 +5223,25 @@ func TestAgentConnectCALeafCert_good(t *testing.T) {
require.Equal("HIT", resp.Header().Get("X-Cache"))
}

// Test that caching is updated in the background
// Issue a blocking query to ensure that the cert gets updated appropriately
{
// Set a new CA
ca := connect.TestCAConfigSet(t, a, nil)

retry.Run(t, func(r *retry.R) {
resp := httptest.NewRecorder()
// Try and sign again (note no index/wait arg since cache should update in
// background even if we aren't actively blocking)
obj, err := a.srv.AgentConnectCALeafCert(resp, req)
r.Check(err)

issued2 := obj.(*structs.IssuedCert)
if issued.CertPEM == issued2.CertPEM {
r.Fatalf("leaf has not updated")
}

// Got a new leaf. Sanity check it's a whole new key as well as different
// cert.
if issued.PrivateKeyPEM == issued2.PrivateKeyPEM {
r.Fatalf("new leaf has same private key as before")
}
resp := httptest.NewRecorder()
req, _ := http.NewRequest("GET", "/v1/agent/connect/ca/leaf/test?index="+index, nil)
obj, err := a.srv.AgentConnectCALeafCert(resp, req)
require.NoError(err)
issued2 := obj.(*structs.IssuedCert)
require.NotEqual(issued.CertPEM, issued2.CertPEM)
require.NotEqual(issued.PrivateKeyPEM, issued2.PrivateKeyPEM)

// Verify that the cert is signed by the new CA
requireLeafValidUnderCA(t, issued2, ca)
// Verify that the cert is signed by the new CA
requireLeafValidUnderCA(t, issued2, ca)

// Should be a cache hit! The data should've updated in the cache
// in the background so this should've been fetched directly from
// the cache.
if resp.Header().Get("X-Cache") != "HIT" {
r.Fatalf("should be a cache hit")
}
})
// Should not be a cache hit! The data was updated in response to the blocking
// query being made.
require.Equal("MISS", resp.Header().Get("X-Cache"))
}
}

Expand Down
2 changes: 1 addition & 1 deletion agent/cache-types/connect_ca_leaf.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ const caChangeJitterWindow = 30 * time.Second
// ConnectCALeaf supports fetching and generating Connect leaf
// certificates.
type ConnectCALeaf struct {
RegisterOptionsBlockingRefresh
RegisterOptionsBlockingNoRefresh
caIndex uint64 // Current index for CA roots

// rootWatchMu protects access to the rootWatchSubscribers map and
Expand Down
12 changes: 11 additions & 1 deletion agent/cache-types/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,17 @@ func (r RegisterOptionsBlockingRefresh) RegisterOptions() cache.RegisterOptions
Refresh: true,
SupportsBlocking: true,
RefreshTimer: 0 * time.Second,
RefreshTimeout: 10 * time.Minute,
QueryTimeout: 10 * time.Minute,
}
}

type RegisterOptionsBlockingNoRefresh struct{}

func (r RegisterOptionsBlockingNoRefresh) RegisterOptions() cache.RegisterOptions {
return cache.RegisterOptions{
Refresh: false,
SupportsBlocking: true,
QueryTimeout: 10 * time.Minute,
}
}

Expand Down
15 changes: 9 additions & 6 deletions agent/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ type RegisterOptions struct {
// If this is zero, then data is refreshed immediately when a fetch
// is returned.
//
// RefreshTimeout determines the maximum query time for a refresh
// QueryTimeout determines the maximum query time for a blocking query
// operation. This is specified as part of the query options and is
// expected to be implemented by the Type itself.
//
Expand All @@ -184,8 +184,8 @@ type RegisterOptions struct {
// refresh can be set so that changes in server data are recognized
// within the cache very quickly.
//
RefreshTimer time.Duration
RefreshTimeout time.Duration
RefreshTimer time.Duration
QueryTimeout time.Duration
}

// RegisterType registers a cacheable type.
Expand Down Expand Up @@ -473,8 +473,7 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
// keepalives are every 30 seconds so the RPC should fail if the packets are
// being blackholed for more than 30 seconds.
var connectedTimer *time.Timer
if tEntry.Opts.Refresh && entry.Index > 0 &&
tEntry.Opts.RefreshTimeout > (31*time.Second) {
if tEntry.Opts.Refresh && entry.Index > 0 && tEntry.Opts.QueryTimeout > 31*time.Second {
connectedTimer = time.AfterFunc(31*time.Second, func() {
c.entriesLock.Lock()
defer c.entriesLock.Unlock()
Expand All @@ -490,7 +489,11 @@ func (c *Cache) fetch(key string, r getOptions, allowNew bool, attempt uint, ign
fOpts := FetchOptions{}
if tEntry.Opts.SupportsBlocking {
fOpts.MinIndex = entry.Index
fOpts.Timeout = tEntry.Opts.RefreshTimeout
fOpts.Timeout = tEntry.Opts.QueryTimeout

if fOpts.Timeout == 0 {
fOpts.Timeout = 10 * time.Minute
}
}
if entry.Valid {
fOpts.LastResult = &FetchResult{
Expand Down
Loading

0 comments on commit 49145df

Please sign in to comment.