Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport of heartbeat: use leader's ACL token when failing heartbeat into release/1.9.x #24248

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/24241.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
heartbeat: Fixed a bug where failed nodes would not be marked down
```
4 changes: 4 additions & 0 deletions nomad/auth/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,10 @@ func NewAuthenticator(cfg *AuthenticatorConfig) *Authenticator {
// an ephemeral ACLToken makes the original of the credential clear to RPC
// handlers, who may have different behavior for internal vs external origins.
//
// Note: when making a server-to-server RPC that authenticates with this method,
// the RPC *must* include the leader's ACL token. Use AuthenticateServerOnly for
// requests that don't have access to the leader's ACL token.
//
// Note: when called on the follower we'll be making stale queries, so it's
// possible if the follower is behind that the leader will get a different value
// if an ACL token or allocation's WI has just been created.
Expand Down
3 changes: 2 additions & 1 deletion nomad/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,8 @@ func (h *nodeHeartbeater) invalidateHeartbeat(id string) {
Status: structs.NodeStatusDown,
NodeEvent: structs.NewNodeEvent().SetSubsystem(structs.NodeEventSubsystemCluster).SetMessage(NodeHeartbeatEventMissed),
WriteRequest: structs.WriteRequest{
Region: h.srv.config.Region,
Region: h.srv.config.Region,
AuthToken: h.srv.getLeaderAcl(),
},
}

Expand Down
38 changes: 23 additions & 15 deletions nomad/node_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -952,7 +952,7 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
// Setup server with tighter heartbeat so we don't have to wait so long
// for nodes to go down.
heartbeatTTL := time.Duration(500*testutil.TestMultiplier()) * time.Millisecond
s, cleanupS := TestServer(t, func(c *Config) {
s, rootToken, cleanupS := TestACLServer(t, func(c *Config) {
c.MinHeartbeatTTL = heartbeatTTL
c.HeartbeatGrace = 2 * heartbeatTTL
})
Expand Down Expand Up @@ -1001,7 +1001,8 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
go heartbeat(heartbeatCtx)

// Wait for node to be ready.
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady)
testutil.WaitForClientStatusWithToken(t, s.RPC, node.ID, "global",
structs.NodeStatusReady, rootToken.SecretID)

// Register job with Disconnect.LostAfter
job := version.jobSpec(time.Hour)
Expand All @@ -1018,22 +1019,24 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
WriteRequest: structs.WriteRequest{
Region: "global",
Namespace: job.Namespace,
AuthToken: rootToken.SecretID,
},
}
var jobResp structs.JobRegisterResponse
err = msgpackrpc.CallWithCodec(codec, "Job.Register", jobReq, &jobResp)
must.NoError(t, err)

// Wait for alloc to be pending in the server.
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
testutil.WaitForJobAllocStatusWithToken(t, s.RPC, job, map[string]int{
structs.AllocClientStatusPending: 1,
})
}, rootToken.SecretID)

// Get allocs that node should run.
allocsReq := &structs.NodeSpecificRequest{
NodeID: node.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
Region: "global",
AuthToken: rootToken.SecretID,
},
}
var allocsResp structs.NodeAllocsResponse
Expand All @@ -1058,17 +1061,18 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
must.NoError(t, err)

// Wait for alloc to be running in the server.
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
testutil.WaitForJobAllocStatusWithToken(t, s.RPC, job, map[string]int{
structs.AllocClientStatusRunning: 1,
})
}, rootToken.SecretID)

// Stop heartbeat and wait for the client to be disconnected and the alloc
// to be unknown.
cancelHeartbeat()
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusDisconnected)
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
testutil.WaitForClientStatusWithToken(t, s.RPC, node.ID, "global",
structs.NodeStatusDisconnected, rootToken.SecretID)
testutil.WaitForJobAllocStatusWithToken(t, s.RPC, job, map[string]int{
structs.AllocClientStatusUnknown: 1,
})
}, rootToken.SecretID)

// Restart heartbeat to reconnect node.
heartbeatCtx, cancelHeartbeat = context.WithCancel(context.Background())
Expand All @@ -1081,15 +1085,17 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
// allocs status with the server so the scheduler have the necessary
// information to avoid unnecessary placements.
time.Sleep(3 * heartbeatTTL)
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusInit)
testutil.WaitForClientStatusWithToken(t, s.RPC, node.ID, "global",
structs.NodeStatusInit, rootToken.SecretID)

// Get allocs that node should run.
// The node should only have one alloc assigned until it updates its allocs
// status with the server.
allocsReq = &structs.NodeSpecificRequest{
NodeID: node.ID,
QueryOptions: structs.QueryOptions{
Region: "global",
Region: "global",
AuthToken: rootToken.SecretID,
},
}
err = msgpackrpc.CallWithCodec(codec, "Node.GetAllocs", allocsReq, &allocsResp)
Expand All @@ -1104,10 +1110,12 @@ func TestClientEndpoint_UpdateStatus_Reconnect(t *testing.T) {
// - client status is ready.
// - only 1 alloc and the alloc is running.
// - all evals are terminal, so cluster is in a stable state.
testutil.WaitForClientStatus(t, s.RPC, node.ID, "global", structs.NodeStatusReady)
testutil.WaitForJobAllocStatus(t, s.RPC, job, map[string]int{
testutil.WaitForClientStatusWithToken(t, s.RPC, node.ID, "global",
structs.NodeStatusReady, rootToken.SecretID)
testutil.WaitForJobAllocStatusWithToken(t, s.RPC, job, map[string]int{
structs.AllocClientStatusRunning: 1,
})
}, rootToken.SecretID)

testutil.WaitForResult(func() (bool, error) {
state := s.fsm.State()
ws := memdb.NewWatchSet()
Expand Down
18 changes: 14 additions & 4 deletions testutil/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -193,17 +193,27 @@ func WaitForClient(t testing.TB, rpc rpcFn, nodeID string, region string) {
WaitForClientStatus(t, rpc, nodeID, region, structs.NodeStatusReady)
}

// WaitForClientStatus blocks until the client is in the expected status.
func WaitForClientStatus(t testing.TB, rpc rpcFn, nodeID string, region string, status string) {
// WaitForClientStatus blocks until the client is in the expected status
func WaitForClientStatus(t testing.TB, rpc rpcFn, nodeID, region, status string) {
t.Helper()
WaitForClientStatusWithToken(t, rpc, nodeID, region, status, "")
}

// WaitForClientStatusWithToken blocks until the client is in the expected
// status, for use with ACLs enabled
func WaitForClientStatusWithToken(t testing.TB, rpc rpcFn, nodeID, region, status, token string) {
t.Helper()

if region == "" {
region = "global"
}
WaitForResult(func() (bool, error) {
req := structs.NodeSpecificRequest{
NodeID: nodeID,
QueryOptions: structs.QueryOptions{Region: region},
NodeID: nodeID,
QueryOptions: structs.QueryOptions{
Region: region,
AuthToken: token,
},
}
var out structs.SingleNodeResponse

Expand Down