Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VAULT-31755: Add removed and HA health to the sys/health endpoint #28991

Merged
merged 8 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 20 additions & 15 deletions api/sys_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func (c *Sys) HealthWithContext(ctx context.Context) (*HealthResponse, error) {
r.Params.Add("standbycode", "299")
r.Params.Add("drsecondarycode", "299")
r.Params.Add("performancestandbycode", "299")
r.Params.Add("removedcode", "299")
r.Params.Add("haunhealthycode", "299")

resp, err := c.c.rawRequestWithContext(ctx, r)
if err != nil {
Expand All @@ -38,19 +40,22 @@ func (c *Sys) HealthWithContext(ctx context.Context) (*HealthResponse, error) {
}

type HealthResponse struct {
Initialized bool `json:"initialized"`
Sealed bool `json:"sealed"`
Standby bool `json:"standby"`
PerformanceStandby bool `json:"performance_standby"`
ReplicationPerformanceMode string `json:"replication_performance_mode"`
ReplicationDRMode string `json:"replication_dr_mode"`
ServerTimeUTC int64 `json:"server_time_utc"`
Version string `json:"version"`
ClusterName string `json:"cluster_name,omitempty"`
ClusterID string `json:"cluster_id,omitempty"`
LastWAL uint64 `json:"last_wal,omitempty"`
Enterprise bool `json:"enterprise"`
EchoDurationMillis int64 `json:"echo_duration_ms"`
ClockSkewMillis int64 `json:"clock_skew_ms"`
ReplicationPrimaryCanaryAgeMillis int64 `json:"replication_primary_canary_age_ms"`
Initialized bool `json:"initialized"`
Sealed bool `json:"sealed"`
Standby bool `json:"standby"`
PerformanceStandby bool `json:"performance_standby"`
ReplicationPerformanceMode string `json:"replication_performance_mode"`
ReplicationDRMode string `json:"replication_dr_mode"`
ServerTimeUTC int64 `json:"server_time_utc"`
Version string `json:"version"`
ClusterName string `json:"cluster_name,omitempty"`
ClusterID string `json:"cluster_id,omitempty"`
LastWAL uint64 `json:"last_wal,omitempty"`
Enterprise bool `json:"enterprise"`
EchoDurationMillis int64 `json:"echo_duration_ms"`
ClockSkewMillis int64 `json:"clock_skew_ms"`
ReplicationPrimaryCanaryAgeMillis int64 `json:"replication_primary_canary_age_ms"`
RemovedFromCluster *bool `json:"removed_from_cluster,omitempty"`
HAConnectionHealthy *bool `json:"ha_connection_healthy,omitempty"`
LastRequestForwardingHeartbeatMillis int64 `json:"last_request_forwarding_heartbeat_ms,omitempty"`
}
6 changes: 6 additions & 0 deletions changelog/28991.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
```release-note:change
api: Add to sys/health whether the node has been removed from the HA cluster. If the node has been removed, return code 530 by default or the value of the `removedcode` query parameter.
```
```release-note:change
api: Add to sys/health whether the standby node has been able to successfully send heartbeats to the active node and the time in milliseconds since the last heartbeat. If the standby has been unable to send a heartbeat, return code 474 by default or the value of the `haunhealthycode` query parameter.
```
66 changes: 50 additions & 16 deletions http/sys_health.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,20 @@ func getSysHealth(core *vault.Core, r *http.Request) (int, *HealthResponse, erro
perfStandbyCode = code
}

haUnhealthyCode := 474
if code, found, ok := fetchStatusCode(r, "haunhealthycode"); !ok {
return http.StatusBadRequest, nil, nil
} else if found {
haUnhealthyCode = code
}

removedCode := 530
if code, found, ok := fetchStatusCode(r, "removedcode"); !ok {
return http.StatusBadRequest, nil, nil
} else if found {
removedCode = code
}

ctx := context.Background()

// Check system status
Expand All @@ -175,13 +189,21 @@ func getSysHealth(core *vault.Core, r *http.Request) (int, *HealthResponse, erro
return http.StatusInternalServerError, nil, err
}

removed, shouldIncludeRemoved := core.IsRemovedFromCluster()

haHealthy, lastHeartbeat := core.GetHAHeartbeatHealth()

// Determine the status code
code := activeCode
switch {
case !init:
code = uninitCode
case removed:
code = removedCode
case sealed:
code = sealedCode
case !haHealthy && lastHeartbeat != nil:
raskchanky marked this conversation as resolved.
Show resolved Hide resolved
code = haUnhealthyCode
case replicationState.HasState(consts.ReplicationDRSecondary):
code = drSecondaryCode
case perfStandby:
Expand Down Expand Up @@ -233,6 +255,15 @@ func getSysHealth(core *vault.Core, r *http.Request) (int, *HealthResponse, erro
return http.StatusInternalServerError, nil, err
}

if shouldIncludeRemoved {
body.RemovedFromCluster = &removed
}

if lastHeartbeat != nil {
body.LastRequestForwardingHeartbeatMillis = lastHeartbeat.Milliseconds()
body.HAConnectionHealthy = &haHealthy
}

if licenseState != nil {
body.License = &HealthResponseLicense{
State: licenseState.State,
Expand All @@ -257,20 +288,23 @@ type HealthResponseLicense struct {
}

type HealthResponse struct {
Initialized bool `json:"initialized"`
Sealed bool `json:"sealed"`
Standby bool `json:"standby"`
PerformanceStandby bool `json:"performance_standby"`
ReplicationPerformanceMode string `json:"replication_performance_mode"`
ReplicationDRMode string `json:"replication_dr_mode"`
ServerTimeUTC int64 `json:"server_time_utc"`
Version string `json:"version"`
Enterprise bool `json:"enterprise"`
ClusterName string `json:"cluster_name,omitempty"`
ClusterID string `json:"cluster_id,omitempty"`
LastWAL uint64 `json:"last_wal,omitempty"`
License *HealthResponseLicense `json:"license,omitempty"`
EchoDurationMillis int64 `json:"echo_duration_ms"`
ClockSkewMillis int64 `json:"clock_skew_ms"`
ReplicationPrimaryCanaryAgeMillis int64 `json:"replication_primary_canary_age_ms"`
Initialized bool `json:"initialized"`
Sealed bool `json:"sealed"`
Standby bool `json:"standby"`
PerformanceStandby bool `json:"performance_standby"`
ReplicationPerformanceMode string `json:"replication_performance_mode"`
ReplicationDRMode string `json:"replication_dr_mode"`
ServerTimeUTC int64 `json:"server_time_utc"`
Version string `json:"version"`
Enterprise bool `json:"enterprise"`
ClusterName string `json:"cluster_name,omitempty"`
ClusterID string `json:"cluster_id,omitempty"`
LastWAL uint64 `json:"last_wal,omitempty"`
License *HealthResponseLicense `json:"license,omitempty"`
EchoDurationMillis int64 `json:"echo_duration_ms"`
ClockSkewMillis int64 `json:"clock_skew_ms"`
ReplicationPrimaryCanaryAgeMillis int64 `json:"replication_primary_canary_age_ms"`
RemovedFromCluster *bool `json:"removed_from_cluster,omitempty"`
HAConnectionHealthy *bool `json:"ha_connection_healthy,omitempty"`
raskchanky marked this conversation as resolved.
Show resolved Hide resolved
LastRequestForwardingHeartbeatMillis int64 `json:"last_request_forwarding_heartbeat_ms,omitempty"`
}
27 changes: 27 additions & 0 deletions http/sys_health_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/hashicorp/vault/helper/constants"
"github.com/hashicorp/vault/sdk/helper/consts"
"github.com/hashicorp/vault/vault"
"github.com/stretchr/testify/require"
)

func TestSysHealth_get(t *testing.T) {
Expand Down Expand Up @@ -215,3 +216,29 @@ func TestSysHealth_head(t *testing.T) {
}
}
}

// TestSysHealth_Removed checks that a removed node returns a 530 and sets
// removed from cluster to be true. The test also checks that the removedcode
// query parameter is respected.
func TestSysHealth_Removed(t *testing.T) {
core, err := vault.TestCoreWithMockRemovableNodeHABackend(t, true)
require.NoError(t, err)
vault.TestCoreInit(t, core)
ln, addr := TestServer(t, core)
defer ln.Close()
raw, err := http.Get(addr + "/v1/sys/health")
require.NoError(t, err)
testResponseStatus(t, raw, 530)
healthResp := HealthResponse{}
testResponseBody(t, raw, &healthResp)
require.NotNil(t, healthResp.RemovedFromCluster)
require.True(t, *healthResp.RemovedFromCluster)

raw, err = http.Get(addr + "/v1/sys/health?removedcode=299")
require.NoError(t, err)
testResponseStatus(t, raw, 299)
secondHealthResp := HealthResponse{}
testResponseBody(t, raw, &secondHealthResp)
require.NotNil(t, secondHealthResp.RemovedFromCluster)
require.True(t, *secondHealthResp.RemovedFromCluster)
}
18 changes: 18 additions & 0 deletions vault/cluster/inmem_layer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,24 @@ func (l *InmemLayer) Listeners() []NetworkListener {
return []NetworkListener{l.listener}
}

// Partition forces the inmem layer to disconnect itself from peers and prevents
// creating new connections. The returned function will add all peers back
// and re-enable connections
func (l *InmemLayer) Partition() (unpartition func()) {
l.l.Lock()
peersCopy := make([]*InmemLayer, 0, len(l.peers))
for _, peer := range l.peers {
peersCopy = append(peersCopy, peer)
}
l.l.Unlock()
l.DisconnectAll()
return func() {
for _, peer := range peersCopy {
l.Connect(peer)
}
}
}

Comment on lines +119 to +136
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very neat!

// Dial implements NetworkLayer.
func (l *InmemLayer) Dial(addr string, timeout time.Duration, tlsConfig *tls.Config) (*tls.Conn, error) {
l.l.Lock()
Expand Down
3 changes: 3 additions & 0 deletions vault/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,6 +528,8 @@ type Core struct {
rpcClientConn *grpc.ClientConn
// The grpc forwarding client
rpcForwardingClient *forwardingClient
// The time of the last successful request forwarding heartbeat
rpcLastSuccessfulHeartbeat *atomic.Value
// The UUID used to hold the leader lock. Only set on active node
leaderUUID string

Expand Down Expand Up @@ -1091,6 +1093,7 @@ func CreateCore(conf *CoreConfig) (*Core, error) {
echoDuration: uberAtomic.NewDuration(0),
activeNodeClockSkewMillis: uberAtomic.NewInt64(0),
periodicLeaderRefreshInterval: conf.PeriodicLeaderRefreshInterval,
rpcLastSuccessfulHeartbeat: new(atomic.Value),
}

c.standbyStopCh.Store(make(chan struct{}))
Expand Down
125 changes: 125 additions & 0 deletions vault/external_tests/raft/raft_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,10 @@ import (
vaulthttp "github.com/hashicorp/vault/http"
"github.com/hashicorp/vault/internalshared/configutil"
"github.com/hashicorp/vault/physical/raft"
"github.com/hashicorp/vault/sdk/helper/jsonutil"
"github.com/hashicorp/vault/sdk/logical"
"github.com/hashicorp/vault/vault"
"github.com/hashicorp/vault/vault/cluster"
vaultseal "github.com/hashicorp/vault/vault/seal"
"github.com/stretchr/testify/require"
"golang.org/x/net/http2"
Expand Down Expand Up @@ -1390,3 +1392,126 @@ func TestRaftCluster_Removed(t *testing.T) {
require.Error(t, err)
require.True(t, follower.Sealed())
}

// TestSysHealth_Raft creates a raft cluster and verifies that the health status
// is OK for a healthy follower. The test partitions one of the nodes so that it
// can't send request forwarding RPCs. The test verifies that the status
// endpoint shows that HA isn't healthy. Finally, the test removes the
// partitioned follower and unpartitions it. The follower will learn that it has
// been removed, and should return the removed status.
func TestSysHealth_Raft(t *testing.T) {
parseHealthBody := func(t *testing.T, resp *api.Response) *vaulthttp.HealthResponse {
t.Helper()
health := vaulthttp.HealthResponse{}
defer resp.Body.Close()
require.NoError(t, jsonutil.DecodeJSONFromReader(resp.Body, &health))
return &health
}

opts := &vault.TestClusterOptions{
HandlerFunc: vaulthttp.Handler,
NumCores: 3,
InmemClusterLayers: true,
}
heartbeat := 500 * time.Millisecond
teststorage.RaftBackendSetup(nil, opts)
conf := &vault.CoreConfig{
ClusterHeartbeatInterval: heartbeat,
}
vaultCluster := vault.NewTestCluster(t, conf, opts)
defer vaultCluster.Cleanup()
testhelpers.WaitForActiveNodeAndStandbys(t, vaultCluster)
followerClient := vaultCluster.Cores[1].Client

t.Run("healthy", func(t *testing.T) {
resp, err := followerClient.Logical().ReadRawWithData("sys/health", map[string][]string{
"perfstandbyok": {"true"},
"standbyok": {"true"},
})
require.NoError(t, err)
require.Equal(t, resp.StatusCode, 200)
r := parseHealthBody(t, resp)
require.False(t, *r.RemovedFromCluster)
require.True(t, *r.HAConnectionHealthy)
require.Less(t, r.LastRequestForwardingHeartbeatMillis, 2*heartbeat.Milliseconds())
})
nl := vaultCluster.Cores[1].NetworkLayer()
inmem, ok := nl.(*cluster.InmemLayer)
require.True(t, ok)
unpartition := inmem.Partition()

t.Run("partition", func(t *testing.T) {
time.Sleep(2 * heartbeat)
var erroredResponse *api.Response
// the node isn't able to send/receive heartbeats, so it will have
// haunhealthy status.
testhelpers.RetryUntil(t, 3*time.Second, func() error {
resp, err := followerClient.Logical().ReadRawWithData("sys/health", map[string][]string{
"perfstandbyok": {"true"},
"standbyok": {"true"},
})
if err == nil {
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
return errors.New("expected error")
}
if resp.StatusCode != 474 {
resp.Body.Close()
miagilepner marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("status code %d", resp.StatusCode)
}
erroredResponse = resp
return nil
})
r := parseHealthBody(t, erroredResponse)
require.False(t, *r.RemovedFromCluster)
require.False(t, *r.HAConnectionHealthy)
require.Greater(t, r.LastRequestForwardingHeartbeatMillis, 2*heartbeat.Milliseconds())

// ensure haunhealthycode is respected
resp, err := followerClient.Logical().ReadRawWithData("sys/health", map[string][]string{
"perfstandbyok": {"true"},
"standbyok": {"true"},
"haunhealthycode": {"299"},
})
require.NoError(t, err)
require.Equal(t, 299, resp.StatusCode)
resp.Body.Close()
})

t.Run("remove and unpartition", func(t *testing.T) {
leaderClient := vaultCluster.Cores[0].Client
_, err := leaderClient.Logical().Write("sys/storage/raft/remove-peer", map[string]interface{}{
"server_id": vaultCluster.Cores[1].NodeID,
})
require.NoError(t, err)
unpartition()

var erroredResponse *api.Response

// now that the node can connect again, it will start getting the removed
// error when trying to connect. The code should be removed, and the ha
// connection will be nil because there is no ha connection
testhelpers.RetryUntil(t, 10*time.Second, func() error {
resp, err := followerClient.Logical().ReadRawWithData("sys/health", map[string][]string{
"perfstandbyok": {"true"},
"standbyok": {"true"},
})
if err == nil {
if resp != nil && resp.Body != nil {
resp.Body.Close()
}
return fmt.Errorf("expected error")
}
if resp.StatusCode != 530 {
resp.Body.Close()
miagilepner marked this conversation as resolved.
Show resolved Hide resolved
return fmt.Errorf("status code %d", resp.StatusCode)
}
erroredResponse = resp
return nil
})
r := parseHealthBody(t, erroredResponse)
require.True(t, true, *r.RemovedFromCluster)
require.Nil(t, r.HAConnectionHealthy)
})
}
Loading
Loading