Skip to content

Commit

Permalink
fix vault retry logic on failed calls
Browse files Browse the repository at this point in the history
The original problem was that for non-renewable vault secrets that it
was having trouble fetching, it would wait the standard exponential
backoff time plus the configured sleep time (like it does between
successful fetches). When what it should do is use the sleep time
between successful fetches and exponential backoff on failures.

While fixing this I cleaned up the code to make the logic more clear.
The issue existed in both vault_read and vault_write, and they shared a
common chunk of renew logic between them and with vault_token. So I
refactored that out into a common function.

Fixes #1224
  • Loading branch information
eikenb committed Aug 27, 2019
1 parent d00a92a commit 56949b1
Show file tree
Hide file tree
Showing 6 changed files with 139 additions and 139 deletions.
40 changes: 39 additions & 1 deletion dependency/vault_common.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

var (
// VaultDefaultLeaseDuration is the default lease duration in seconds.
VaultDefaultLeaseDuration = 5 * time.Minute
VaultDefaultLeaseDuration = 10 * time.Second
)

// Secret is the structure returned for every secret within Vault.
Expand Down Expand Up @@ -64,6 +64,44 @@ type SecretWrapInfo struct {
WrappedAccessor string
}

//
type renewer interface {
Dependency
stopChan() chan struct{}
secrets() (*Secret, *api.Secret)
}

func renewSecret(clients *ClientSet, d renewer) error {
log.Printf("[TRACE] %s: starting renewer", d)

secret, vaultSecret := d.secrets()
renewer, err := clients.Vault().NewRenewer(&api.RenewerInput{
Secret: vaultSecret,
})
if err != nil {
return err
}
go renewer.Renew()
defer renewer.Stop()

for {
select {
case err := <-renewer.DoneCh():
if err != nil {
log.Printf("[WARN] %s: failed to renew: %s", d, err)
}
log.Printf("[WARN] %s: renewer done (maybe the lease expired)", d)
return nil
case renewal := <-renewer.RenewCh():
log.Printf("[TRACE] %s: successfully renewed", d)
printVaultWarnings(d, renewal.Secret.Warnings)
updateSecret(secret, renewal.Secret)
case <-d.stopChan():
return ErrStopped
}
}
}

// vaultRenewDuration accepts a secret and returns the recommended amount of
// time to sleep.
func vaultRenewDuration(s *Secret) time.Duration {
Expand Down
90 changes: 39 additions & 51 deletions dependency/vault_read.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ var (

// VaultReadQuery is the dependency to Vault for a secret
type VaultReadQuery struct {
stopCh chan struct{}
stopCh chan struct{}
sleepCh <-chan time.Time

rawPath string
queryValues url.Values
Expand All @@ -45,6 +46,7 @@ func NewVaultReadQuery(s string) (*VaultReadQuery, error) {

return &VaultReadQuery{
stopCh: make(chan struct{}, 1),
sleepCh: make(chan time.Time, 1),
rawPath: secretURL.Path,
queryValues: secretURL.Query(),
}, nil
Expand All @@ -56,70 +58,56 @@ func (d *VaultReadQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interfac
case <-d.stopCh:
return nil, nil, ErrStopped
default:
select {
case <-d.sleepCh:
default:
}
}

opts = opts.Merge(&QueryOptions{})
firstRun := d.secret == nil

if d.secret != nil {
if vaultSecretRenewable(d.secret) {
log.Printf("[TRACE] %s: starting renewer", d)

renewer, err := clients.Vault().NewRenewer(&api.RenewerInput{
Grace: opts.VaultGrace,
Secret: d.vaultSecret,
})
if err != nil {
return nil, nil, errors.Wrap(err, d.String())
}
go renewer.Renew()
defer renewer.Stop()

RENEW:
for {
select {
case err := <-renewer.DoneCh():
if err != nil {
log.Printf("[WARN] %s: failed to renew: %s", d, err)
}
log.Printf("[WARN] %s: renewer returned (maybe the lease expired)", d)
break RENEW
case renewal := <-renewer.RenewCh():
log.Printf("[TRACE] %s: successfully renewed", d)
printVaultWarnings(d, renewal.Secret.Warnings)
updateSecret(d.secret, renewal.Secret)
case <-d.stopCh:
return nil, nil, ErrStopped
}
}
} else {
// The secret isn't renewable, probably the generic secret backend.
dur := vaultRenewDuration(d.secret)
log.Printf("[TRACE] %s: secret is not renewable, sleeping for %s", d, dur)
select {
case <-time.After(dur):
// The lease is almost expired, it's time to request a new one.
case <-d.stopCh:
return nil, nil, ErrStopped
}
if !firstRun && vaultSecretRenewable(d.secret) {
err := renewSecret(clients, d)
if err != nil {
return nil, nil, errors.Wrap(err, d.String())
}
}

// We don't have a secret, or the prior renewal failed
vaultSecret, err := d.readSecret(clients, opts)
err := d.fetchSecret(clients, opts)
if err != nil {
return nil, nil, errors.Wrap(err, d.String())
}

// Print any warnings
printVaultWarnings(d, vaultSecret.Warnings)

// Create the cloned secret which will be exposed to the template.
d.vaultSecret = vaultSecret
d.secret = transformSecret(vaultSecret)
if !vaultSecretRenewable(d.secret) {
dur := vaultRenewDuration(d.secret)
log.Printf("[TRACE] %s: non-renewable secret, set sleep for %s", d, dur)
d.sleepCh = time.After(dur)
}

return respWithMetadata(d.secret)
}

func (d *VaultReadQuery) fetchSecret(clients *ClientSet, opts *QueryOptions,
) error {
opts = opts.Merge(&QueryOptions{})
vaultSecret, err := d.readSecret(clients, opts)
if err == nil {
printVaultWarnings(d, vaultSecret.Warnings)
d.vaultSecret = vaultSecret
// the cloned secret which will be exposed to the template
d.secret = transformSecret(vaultSecret)
}
return err
}

func (d *VaultReadQuery) stopChan() chan struct{} {
return d.stopCh
}

func (d *VaultReadQuery) secrets() (*Secret, *api.Secret) {
return d.secret, d.vaultSecret
}

// CanShare returns if this dependency is shareable.
func (d *VaultReadQuery) CanShare() bool {
return false
Expand Down
11 changes: 9 additions & 2 deletions dependency/vault_read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ func TestNewVaultReadQuery(t *testing.T) {

if act != nil {
act.stopCh = nil
act.sleepCh = nil
}

assert.Equal(t, tc.exp, act)
Expand Down Expand Up @@ -170,7 +171,10 @@ func TestVaultReadQuery_Fetch_KVv1(t *testing.T) {
errCh <- err
return
}
dataCh <- data
select {
case dataCh <- data:
case <-d.stopCh:
}
}
}()

Expand Down Expand Up @@ -372,7 +376,10 @@ func TestVaultReadQuery_Fetch_KVv2(t *testing.T) {
errCh <- err
return
}
dataCh <- data
select {
case dataCh <- data:
case <-d.stopCh:
}
}
}()

Expand Down
43 changes: 10 additions & 33 deletions dependency/vault_token.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"time"

"github.com/hashicorp/vault/api"
"github.com/pkg/errors"
)

var (
Expand Down Expand Up @@ -44,43 +43,14 @@ func (d *VaultTokenQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interfa
default:
}

opts = opts.Merge(&QueryOptions{})

if vaultSecretRenewable(d.secret) {
log.Printf("[TRACE] %s: starting renewer", d)

renewer, err := clients.Vault().NewRenewer(&api.RenewerInput{
Grace: opts.VaultGrace,
Secret: d.vaultSecret,
})
if err != nil {
return nil, nil, errors.Wrap(err, d.String())
}
go renewer.Renew()
defer renewer.Stop()

RENEW:
for {
select {
case err := <-renewer.DoneCh():
if err != nil {
log.Printf("[WARN] %s: failed to renew: %s", d, err)
}
log.Printf("[WARN] %s: renewer returned (maybe the lease expired)", d)
break RENEW
case renewal := <-renewer.RenewCh():
log.Printf("[TRACE] %s: successfully renewed", d)
printVaultWarnings(d, renewal.Secret.Warnings)
updateSecret(d.secret, renewal.Secret)
case <-d.stopCh:
return nil, nil, ErrStopped
}
}
renewSecret(clients, d)
}

// The secret isn't renewable, probably the generic secret backend.
// TODO This is incorrect when given a non-renewable template. We should
// instead to a lookup self to determine the lease duration.
opts = opts.Merge(&QueryOptions{})
dur := vaultRenewDuration(d.secret)
if dur < opts.VaultGrace {
dur = opts.VaultGrace
Expand All @@ -89,14 +59,21 @@ func (d *VaultTokenQuery) Fetch(clients *ClientSet, opts *QueryOptions) (interfa
log.Printf("[TRACE] %s: token is not renewable, sleeping for %s", d, dur)
select {
case <-time.After(dur):
// The lease is almost expired, it's time to request a new one.
case <-d.stopCh:
return nil, nil, ErrStopped
}

return nil, nil, ErrLeaseExpired
}

func (d *VaultTokenQuery) stopChan() chan struct{} {
return d.stopCh
}

func (d *VaultTokenQuery) secrets() (*Secret, *api.Secret) {
return d.secret, d.vaultSecret
}

// CanShare returns if this dependency is shareable.
func (d *VaultTokenQuery) CanShare() bool {
return false
Expand Down
Loading

0 comments on commit 56949b1

Please sign in to comment.