Skip to content

Commit

Permalink
vault: always renew tokens using the renewal loop (#18998)
Browse files Browse the repository at this point in the history
Previously, a Vault token could renewed either periodically via the
renewal loop or immediately by calling `RenewToken()`.

But a race condition in the renewal loop could cause an attempt to renew
an expired token. If both `updateCh` and `renewalCh` are active (such as
when a task stops at the same time its token is waiting for renewal),
the following `select` picks a `case` at random.

https://github.com/hashicorp/nomad/blob/78f0c6b2a910c25522ac88ff63343bb371d72bff/client/vaultclient/vaultclient.go#L557-L564

If `case <-renewalCh` is picked, the token is incorrectly re-added to
the heap, causing unnecessary renewals of a token that is already expired.

https://github.com/hashicorp/nomad/blob/1604dba508b2ae8187b6b9405ce6d9b6ca8e8b4b/client/vaultclient/vaultclient.go#L505-L510

To prevent this situation, the `renew()` function should only renew
tokens that are currently in the heap, so `RenewToken()` must first push
the token to the heap and wait for the renewal to happen instead of
calling `renew()` directly since this could cause another race condition
where the token is renewed twice: once by `RenewToken()` calling
`renew()` directly and a second time if the renewal happens to pick the
token as soon as `RenewToken()` adds it to the heap.
  • Loading branch information
lgfa29 authored Nov 8, 2023
1 parent 783572d commit 7054fe1
Showing 1 changed file with 93 additions and 69 deletions.
162 changes: 93 additions & 69 deletions client/vaultclient/vaultclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ type vaultClient struct {
// vaultClientRenewalRequest is a request object for renewal of both tokens and
// secret's leases.
type vaultClientRenewalRequest struct {
// renewalLoopCh is used to notify listeners every time the token goes
// through the renewal loop. It does not guarantee the renewal was
// successful, so listeners should also read from errCh for renewal errors.
renewalLoopCh chan struct{}

// errCh is the channel into which any renewal error will be sent to
errCh chan error

Expand Down Expand Up @@ -341,13 +346,15 @@ func (c *vaultClient) GetConsulACL(token, path string) (*vaultapi.Secret, error)
return c.client.Logical().Read(path)
}

// RenewToken renews the supplied token for a given duration (in seconds) and
// adds it to the min-heap so that it is renewed periodically by the renewal
// loop. Any error returned during renewal will be written to a buffered
// channel and the channel is returned instead of an actual error. This helps
// the caller be notified of a renewal failure asynchronously for appropriate
// actions to be taken. The caller of this function need not have to close the
// error channel.
// RenewToken pushes the supplied token to the min-heap for an immediate
// renewal for a given duration (in seconds) and blocks until the renewal loop
// has processed it. The token is then renewed periodically until Stop() or
// StopRenewToken() is called.
//
// Any error returned during the periodical renewal will be written to a
// buffered channel and the channel is returned instead of an actual error.
// This helps the caller be notified of a renewal failure asynchronously for
// appropriate actions to be taken.
func (c *vaultClient) RenewToken(token string, increment int) (<-chan error, error) {
if token == "" {
err := fmt.Errorf("missing token")
Expand All @@ -358,38 +365,84 @@ func (c *vaultClient) RenewToken(token string, increment int) (<-chan error, err
return nil, err
}

// Create a buffered error channel
errCh := make(chan error, 1)

// Create a renewal request and indicate that the identifier in the
// request is a token and not a lease
renewalReq := &vaultClientRenewalRequest{
errCh: errCh,
id: token,
isToken: true,
increment: increment,
req := &vaultClientRenewalRequest{
renewalLoopCh: make(chan struct{}),
errCh: make(chan error, 1),
id: token,
isToken: true,
increment: increment,
}

// Push an immediate renewal request to the heap and block until a result
// is received.
err := c.pushRenewalRequest(req, time.Now())
if err != nil {
return nil, err
}

// Perform the renewal of the token and send any error to the dedicated
// error channel.
if err := c.renew(renewalReq); err != nil {
select {
case err := <-req.errCh:
c.logger.Error("error during renewal of token", "error", err)
metrics.IncrCounter([]string{"client", "vault", "renew_token_failure"}, 1)
return nil, err
case <-req.renewalLoopCh:
return req.errCh, nil
}
}

// pushRenewalRequest pushes a renewal request to the heap and triggers the
// renewal loop to re-fetch a new request.
func (c *vaultClient) pushRenewalRequest(req *vaultClientRenewalRequest, next time.Time) error {
c.lock.Lock()
defer c.lock.Unlock()

if !c.running {
return errors.New("token renewal loop is not running")
}

if !c.isTracked(req.id) {
err := c.heap.Push(req, next)
if err != nil {
return fmt.Errorf("failed to push renewal request to heap: %v", err)
}
} else {
err := c.heap.Update(req, next)
if err != nil {
return fmt.Errorf("failed to update renewal request: %v", err)
}
}

// Signal an update for the renewal loop to trigger a fresh computation for
// the next best candidate for renewal.
select {
case c.updateCh <- struct{}{}:
default:
}

return errCh, nil
return nil
}

// renew is a common method to handle renewal of both tokens and secret leases.
// It invokes a token renewal or a secret's lease renewal. If renewal is
// successful, min-heap is updated based on the duration after which it needs
// renewal again. The next renewal time is randomly selected to avoid spikes in
// the number of APIs periodically.
// Only tokens that are present in the heap are renewed.
func (c *vaultClient) renew(req *vaultClientRenewalRequest) error {
c.lock.Lock()
defer c.lock.Unlock()

// Always notify listeners that the request has been processed before
// exiting.
defer func() {
select {
case req.renewalLoopCh <- struct{}{}:
default:
}
}()

if req == nil {
return fmt.Errorf("nil renewal request")
}
Expand All @@ -414,6 +467,12 @@ func (c *vaultClient) renew(req *vaultClientRenewalRequest) error {
return fmt.Errorf("increment cannot be less than 1")
}

// Verify token is still in the heap before proceeding as it may have been
// removed while waiting for the renewal timer to tick.
if !c.isTracked(req.id) {
return nil
}

var renewalErr error
leaseDuration := req.increment
if req.isToken {
Expand Down Expand Up @@ -463,60 +522,25 @@ func (c *vaultClient) renew(req *vaultClientRenewalRequest) error {
"error", renewalErr, "period", next)
}

if c.isTracked(req.id) {
if fatal {
// If encountered with an error where in a lease or a
// token is not valid at all with vault, and if that
// item is tracked by the renewal loop, stop renewing
// it by removing the corresponding heap entry.
if err := c.heap.Remove(req.id); err != nil {
return fmt.Errorf("failed to remove heap entry: %v", err)
}

// Report the fatal error to the client
req.errCh <- renewalErr
close(req.errCh)

return renewalErr
if fatal {
// If encountered with an error where in a lease or a
// token is not valid at all with vault, and if that
// item is tracked by the renewal loop, stop renewing
// it by removing the corresponding heap entry.
if err := c.heap.Remove(req.id); err != nil {
return fmt.Errorf("failed to remove heap entry: %v", err)
}

// If the identifier is already tracked, this indicates a
// subsequest renewal. In this case, update the existing
// element in the heap with the new renewal time.
if err := c.heap.Update(req, next); err != nil {
return fmt.Errorf("failed to update heap entry. err: %v", err)
}

// There is no need to signal an update to the renewal loop
// here because this case is hit from the renewal loop itself.
} else {
if fatal {
// If encountered with an error where in a lease or a
// token is not valid at all with vault, and if that
// item is not tracked by renewal loop, don't add it.

// Report the fatal error to the client
req.errCh <- renewalErr
close(req.errCh)

return renewalErr
}
// Report the fatal error to the client
req.errCh <- renewalErr
close(req.errCh)

// If the identifier is not already tracked, this is a first
// renewal request. In this case, add an entry into the heap
// with the next renewal time.
if err := c.heap.Push(req, next); err != nil {
return fmt.Errorf("failed to push an entry to heap. err: %v", err)
}
return renewalErr
}

// Signal an update for the renewal loop to trigger a fresh
// computation for the next best candidate for renewal.
if c.running {
select {
case c.updateCh <- struct{}{}:
default:
}
}
// Update the element in the heap with the new renewal time.
if err := c.heap.Update(req, next); err != nil {
return fmt.Errorf("failed to update heap entry. err: %v", err)
}

return nil
Expand Down

0 comments on commit 7054fe1

Please sign in to comment.