Skip to content

Commit

Permalink
vault: fix token revocation during workflow migration (#19689)
Browse files Browse the repository at this point in the history
When transitioning from the legacy token-based workflow to the new JWT
workflow for Vault the previous code would instantiate a no-op Vault if
the server configuration had a `default_identity` block.

This no-op client returned an error for some of its operations were
called, such as `LookupToken` and `RevokeTokens`. The original intention
was that, in the new JWT workflow, none of these methods should be
called, so returning an error could help surface potential bugs.

But the `RevokeTokens` and `MarkForRevocation` methods _are_ called even
in the JWT flow. When a leadership transition happens, the new server
looks for unused Vault accessors from state and tries to revoke them.
Similarly, the `RevokeTokens` method is called every time the
`Node.UpdataStatus` and `Node.UpdateAlloc` RPCs are made by clients, as
the Nomad server tries to find unused Vault tokens for the node/alloc.

Since the new JWT flow does not require Nomad servers to contact Vault,
calling `RevokeTokens` and `MarkForRevocation` is not able to complete
without a Vault token, so this commit changes the logic to use the no-op
Vault client when no token is configured. It also updates the client
itself to not error if these methods are called, but to rather just log
so operators can be made aware that there are Vault tokens created by
Nomad that have not been force-expired.

When migrating an existing cluster to the new workload identity based
flow, Nomad operators must first upgrade the Nomad version without
removing any of the existing Vault configuration. Doing so can prevent
Nomad servers from managing and cleaning-up existing Vault tokens during
a leadership transition and node or alloc updates.

Operators must also resubmit all jobs with a `vault` block so they are
updated with an `identity` for Vault. Skipping this step may cause
allocations to fail if their Vault token expires (if, for example, the
Nomad client stops running for TTL/2) or if they are rescheduled, since
the new client will try to follow the legacy flow which will fail if the
Nomad server configuration for Vault has already been updated to remove
the Vault address and token.
  • Loading branch information
lgfa29 authored Jan 10, 2024
1 parent d3e5cae commit 5267eec
Show file tree
Hide file tree
Showing 7 changed files with 238 additions and 28 deletions.
3 changes: 3 additions & 0 deletions .changelog/19689.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
vault: Fixed a bug that could cause errors during leadership transition when migrating to the new JWT and workload identity authentication workflow
```
20 changes: 20 additions & 0 deletions nomad/leader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -818,6 +818,26 @@ func TestLeader_revokeVaultAccessorsOnRestore(t *testing.T) {
}
}

func TestLeader_revokeVaultAccessorsOnRestore_workloadIdentity(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
c.NumSchedulers = 0
})
defer cleanupS1()
testutil.WaitForLeader(t, s1.RPC)

// Insert a Vault accessor that should be revoked
fsmState := s1.fsm.State()
va := mock.VaultAccessor()
err := fsmState.UpsertVaultAccessor(100, []*structs.VaultAccessor{va})
must.NoError(t, err)

// Do a restore
err = s1.revokeVaultAccessorsOnRestore()
must.NoError(t, err)
}

func TestLeader_revokeSITokenAccessorsOnRestore(t *testing.T) {
ci.Parallel(t)
r := require.New(t)
Expand Down
148 changes: 148 additions & 0 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -676,6 +676,49 @@ func TestClientEndpoint_Deregister_Vault(t *testing.T) {
}
}

func TestClientEndpoint_Deregister_Vault_WorkloadIdentity(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
// Enable Vault config and don't set any connection info to use the
// workload identity flow.
c.VaultConfigs[structs.VaultDefaultCluster].Enabled = pointer.Of(true)
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Register mock node.
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)
must.NoError(t, err)

// Put some Vault accessors in the state store for that node
var accessors []*structs.VaultAccessor
for i := 0; i < 3; i++ {
va := mock.VaultAccessor()
va.NodeID = node.ID
accessors = append(accessors, va)
}
state := s1.fsm.State()
state.UpsertVaultAccessor(100, accessors)

// Deregister the mock node and verify no error happens when Vault tokens
// are revoked.
dereg := &structs.NodeDeregisterRequest{
NodeID: node.ID,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.GenericResponse
err = msgpackrpc.CallWithCodec(codec, "Node.Deregister", dereg, &resp2)
must.NoError(t, err)
}

func TestClientEndpoint_UpdateStatus(t *testing.T) {
ci.Parallel(t)
require := require.New(t)
Expand Down Expand Up @@ -815,6 +858,50 @@ func TestClientEndpoint_UpdateStatus_Vault(t *testing.T) {
}
}

func TestClientEndpoint_UpdateStatus_Vault_WorkloadIdentity(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
// Enable Vault config and don't set any connection info to use the
// workload identity flow.
c.VaultConfigs[structs.VaultDefaultCluster].Enabled = pointer.Of(true)
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Register mock node.
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.NodeUpdateResponse
err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)
must.NoError(t, err)

// Put some Vault accessors in the state store for the node.
var accessors []*structs.VaultAccessor
for i := 0; i < 3; i++ {
va := mock.VaultAccessor()
va.NodeID = node.ID
accessors = append(accessors, va)
}
state := s1.fsm.State()
state.UpsertVaultAccessor(100, accessors)

// Update the status to be down and verify no error when Vault tokens are
// revoked.
updateReq := &structs.NodeUpdateStatusRequest{
NodeID: node.ID,
Status: structs.NodeStatusDown,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.NodeUpdateResponse
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateStatus", updateReq, &resp2)
must.NoError(t, err)
}

func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
ci.Parallel(t)

Expand Down Expand Up @@ -3225,6 +3312,67 @@ func TestClientEndpoint_UpdateAlloc_Vault(t *testing.T) {
}
}

func TestClientEndpoint_UpdateAlloc_VaultWorkloadIdentity(t *testing.T) {
ci.Parallel(t)

s1, cleanupS1 := TestServer(t, func(c *Config) {
// Enable Vault config and don't set any connection info to use the
// workload identity flow.
c.VaultConfigs[structs.VaultDefaultCluster].Enabled = pointer.Of(true)
})
defer cleanupS1()
codec := rpcClient(t, s1)
testutil.WaitForLeader(t, s1.RPC)

// Create the node register request.
node := mock.Node()
reg := &structs.NodeRegisterRequest{
Node: node,
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp structs.GenericResponse
err := msgpackrpc.CallWithCodec(codec, "Node.Register", reg, &resp)
must.NoError(t, err)

// Inject allocation and a few Vault accessors.
alloc := mock.Alloc()
alloc.NodeID = node.ID
state := s1.fsm.State()
state.UpsertJobSummary(99, mock.JobSummary(alloc.JobID))
err = state.UpsertAllocs(structs.MsgTypeTestSetup, 100, []*structs.Allocation{alloc})
must.NoError(t, err)

var accessors []*structs.VaultAccessor
for i := 0; i < 3; i++ {
va := mock.VaultAccessor()
va.NodeID = node.ID
va.AllocID = alloc.ID
accessors = append(accessors, va)
}
err = state.UpsertVaultAccessor(101, accessors)
must.NoError(t, err)

// Inject mock job.
job := mock.Job()
job.ID = alloc.JobID
err = state.UpsertJob(structs.MsgTypeTestSetup, 101, nil, job)
must.NoError(t, err)

// Update alloc status and verify no error happens when the orphaned Vault
// tokens are revoked.
clientAlloc := new(structs.Allocation)
*clientAlloc = *alloc
clientAlloc.ClientStatus = structs.AllocClientStatusFailed

update := &structs.AllocUpdateRequest{
Alloc: []*structs.Allocation{clientAlloc},
WriteRequest: structs.WriteRequest{Region: "global"},
}
var resp2 structs.NodeAllocsResponse
err = msgpackrpc.CallWithCodec(codec, "Node.UpdateAlloc", update, &resp2)
must.NoError(t, err)
}

func TestClientEndpoint_CreateNodeEvals(t *testing.T) {
ci.Parallel(t)

Expand Down
23 changes: 20 additions & 3 deletions nomad/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -893,7 +893,24 @@ func (s *Server) Reload(newConfig *Config) error {

// Handle the Vault reload. Vault should never be nil but just guard.
if s.vault != nil {
if err := s.vault.SetConfig(newConfig.GetDefaultVault()); err != nil {
vconfig := newConfig.GetDefaultVault()

// Verify if the new configuration would cause the client type to
// change.
var err error
switch s.vault.(type) {
case *NoopVault:
if vconfig != nil && vconfig.Token != "" {
err = fmt.Errorf("setting a Vault token requires restarting the Nomad agent")
}
case *vaultClient:
if vconfig != nil && vconfig.Token == "" {
err = fmt.Errorf("removing the Vault token requires restarting the Nomad agent")
}
}
if err != nil {
_ = multierror.Append(&mErr, err)
} else if err := s.vault.SetConfig(newConfig.GetDefaultVault()); err != nil {
_ = multierror.Append(&mErr, err)
}
}
Expand Down Expand Up @@ -1192,8 +1209,8 @@ func (s *Server) setupConsul(consulConfigFunc consul.ConfigAPIFunc, consulACLs c
// setupVaultClient is used to set up the Vault API client.
func (s *Server) setupVaultClient() error {
vconfig := s.config.GetDefaultVault()
if vconfig != nil && vconfig.DefaultIdentity != nil {
s.vault = &NoopVault{}
if vconfig != nil && vconfig.Token == "" {
s.vault = NewNoopVault(s.logger)
return nil
}

Expand Down
25 changes: 12 additions & 13 deletions nomad/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,32 +205,31 @@ func TestServer_Regions(t *testing.T) {
func TestServer_Reload_Vault(t *testing.T) {
ci.Parallel(t)

token := uuid.Generate()
s1, cleanupS1 := TestServer(t, func(c *Config) {
c.Region = "global"
c.GetDefaultVault().Token = token
})
defer cleanupS1()

if s1.vault.Running() {
t.Fatalf("Vault client should not be running")
}
must.False(t, s1.vault.Running())

tr := true
config := DefaultConfig()
config.GetDefaultVault().Enabled = &tr
config.GetDefaultVault().Token = uuid.Generate()
config.GetDefaultVault().Token = token
config.GetDefaultVault().Namespace = "nondefault"

if err := s1.Reload(config); err != nil {
t.Fatalf("Reload failed: %v", err)
}
err := s1.Reload(config)
must.NoError(t, err)

if !s1.vault.Running() {
t.Fatalf("Vault client should be running")
}
must.True(t, s1.vault.Running())
must.Eq(t, "nondefault", s1.vault.GetConfig().Namespace)

if s1.vault.GetConfig().Namespace != "nondefault" {
t.Fatalf("Vault client did not get new namespace")
}
// Removing the token requires agent restart.
config.GetDefaultVault().Token = ""
err = s1.Reload(config)
must.ErrorContains(t, err, "requires restarting the Nomad agent")
}

func connectionReset(msg string) bool {
Expand Down
26 changes: 20 additions & 6 deletions nomad/vault_noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

log "github.com/hashicorp/go-hclog"
"github.com/hashicorp/nomad/nomad/structs"
"github.com/hashicorp/nomad/nomad/structs/config"
vapi "github.com/hashicorp/vault/api"
Expand All @@ -17,6 +18,13 @@ import (
type NoopVault struct {
l sync.Mutex
config *config.VaultConfig
logger log.Logger
}

func NewNoopVault(logger log.Logger) *NoopVault {
return &NoopVault{
logger: logger.Named("vault-noop"),
}
}

func (v *NoopVault) SetActive(_ bool) {}
Expand All @@ -37,19 +45,25 @@ func (v *NoopVault) GetConfig() *config.VaultConfig {
}

func (v *NoopVault) CreateToken(_ context.Context, _ *structs.Allocation, _ string) (*vapi.Secret, error) {
return nil, errors.New("Vault client not able to create tokens")
return nil, errors.New("Nomad server is not configured to create tokens")
}

func (v *NoopVault) LookupToken(_ context.Context, _ string) (*vapi.Secret, error) {
return nil, errors.New("Vault client not able to lookup tokens")
return nil, errors.New("Nomad server is not configured to lookup tokens")
}

func (v *NoopVault) RevokeTokens(_ context.Context, _ []*structs.VaultAccessor, _ bool) error {
return errors.New("Vault client not able to revoke tokens")
func (v *NoopVault) RevokeTokens(_ context.Context, tokens []*structs.VaultAccessor, _ bool) error {
for _, t := range tokens {
v.logger.Debug("Vault token is no longer used, but Nomad is not able to revoke it. The token may need to be revoked manually or will expire once its TTL reaches zero.", "accessor", t.Accessor, "ttl", t.CreationTTL)
}
return nil
}

func (v *NoopVault) MarkForRevocation(accessors []*structs.VaultAccessor) error {
return errors.New("Vault client not able to revoke tokens")
func (v *NoopVault) MarkForRevocation(tokens []*structs.VaultAccessor) error {
for _, t := range tokens {
v.logger.Debug("Vault token is no longer used, but Nomad is not able to mark it for revocation. The token may need to be revoked manually or will expire once its TTL reaches zero.", "accessor", t.Accessor, "ttl", t.CreationTTL)
}
return nil
}

func (v *NoopVault) Stop() {}
Expand Down
21 changes: 15 additions & 6 deletions website/content/docs/integrations/vault/acl.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -751,18 +751,26 @@ $ VAULT_TOKEN=s.H39hfS7eHSbb1GpkdzOQLTmz.fvuLy nomad job run vault.nomad
## Migrating to Using Workload Identity with Vault

Migrating from the legacy (pre-1.7) workflow where workloads use the agent's
Vault token requires configuation on your Vault cluster and your Nomad server
Vault token requires configuration on your Vault cluster and your Nomad server
agents. It does not require updating your running Nomad jobs unless you wish to
specify a non-default role. To migrate:

* Create the Vault auth method, default role, and policies on your Vault
cluster.
* Enable [`vault.default_identity`][] blocks in your Nomad server agent
configurations.
* (Optionally) Add [`vault.role`][] fields to any Nomad jobs that will not use
the default role.
* (Optionally) add [`identity`][] blocks to your jobs if you want to use a
different identity because of how your auth method and roles are configured.
configurations, but **do not modify any of the existing Vault
configuration**.
* Upgrade your cluster following the documented [Upgrade
Process][docs_upgrade].
* Resubmit Nomad jobs that need access to Vault to redeploy them with a new
workload identity for Vault.
* (Optionally) Add [`vault.role`][] fields to any Nomad jobs that will not
use the default role.
* (Optionally) add [`identity`][] blocks to your jobs if you want to use a
different identity because of how your auth method and roles are
configured.
* Once all jobs have been resubmitted, you may remove parameters no longer used
by the Nomad server agents from the [`vault`][config] configuration block.

[Variables]: /nomad/docs/concepts/variables
[Vault Namespaces]: /vault/docs/enterprise/namespaces
Expand All @@ -785,6 +793,7 @@ specify a non-default role. To migrate:
[allow_unauth]: /nomad/docs/configuration/vault#allow_unauthenticated
[auth]: /vault/docs/auth/token 'Vault Authentication Backend'
[config]: /nomad/docs/configuration/vault 'Nomad Vault Configuration Block'
[docs_upgrade]: /nomad/docs/upgrade#upgrade-process
[ent]: #enterprise-configuration
[img_vault_auth_method]: /img/vault-integration-auth-method.png
[img_vault_auth_overview]: /img/vault-integration-auth-overview.png
Expand Down

0 comments on commit 5267eec

Please sign in to comment.