Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "operator members" command to list nodes in the cluster. #13292

Merged
merged 8 commits into from
Nov 30, 2021
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions changelog/13292.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:improvement
core/ha: Add new mechanism for keeping track of peers talking to active node, and new 'operator members' command to view them.
```
5 changes: 5 additions & 0 deletions command/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,11 @@ func initCommands(ui, serverCmdUi cli.Ui, runOpts *RunOptions) {
BaseCommand: getBaseCommand(),
}, nil
},
"operator members": func() (cli.Command, error) {
return &OperatorMembersCommand{
BaseCommand: getBaseCommand(),
}, nil
},
"path-help": func() (cli.Command, error) {
return &PathHelpCommand{
BaseCommand: getBaseCommand(),
Expand Down
107 changes: 107 additions & 0 deletions command/operator_members.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package command

import (
"context"
"fmt"
"strings"
"time"

"github.com/mitchellh/cli"
"github.com/posener/complete"
)

var (
_ cli.Command = (*OperatorMembersCommand)(nil)
_ cli.CommandAutocomplete = (*OperatorMembersCommand)(nil)
)

type OperatorMembersCommand struct {
*BaseCommand
}

func (c *OperatorMembersCommand) Synopsis() string {
return "Returns the list of nodes in the cluster"
}

func (c *OperatorMembersCommand) Help() string {
helpText := `
Usage: vault operator members

Provides the details of all the nodes in the cluster.

$ vault operator members

` + c.Flags().Help()

return strings.TrimSpace(helpText)
}

func (c *OperatorMembersCommand) Flags() *FlagSets {
set := c.flagSet(FlagSetHTTP | FlagSetOutputFormat)

return set
}

func (c *OperatorMembersCommand) AutocompleteArgs() complete.Predictor {
return complete.PredictAnything
}

func (c *OperatorMembersCommand) AutocompleteFlags() complete.Flags {
return c.Flags().Completions()
}

func (c *OperatorMembersCommand) Run(args []string) int {
f := c.Flags()

if err := f.Parse(args); err != nil {
c.UI.Error(err.Error())
return 1
}

client, err := c.Client()
if err != nil {
c.UI.Error(err.Error())
return 2
}

r := client.NewRequest("GET", "/v1/sys/ha-status")
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
resp, err := client.RawRequestWithContext(ctx, r)
if err != nil {
return 1
}
defer resp.Body.Close()

var result HaStatusResponse
err = resp.DecodeJSON(&result)

if err != nil {
c.UI.Error(err.Error())
return 2
}

switch Format(c.UI) {
case "table":
out := []string{"Host Name | API Address | Cluster Address | ActiveNode | Last Echo"}
for _, node := range result.Nodes {
out = append(out, fmt.Sprintf("%s | %s | %s | %t | %s", node.Hostname, node.APIAddress, node.ClusterAddress, node.ActiveNode, node.LastEcho))
}
c.UI.Output(tableOutput(out, nil))
return 0
default:
return OutputData(c.UI, result)
}
}

type Node struct {
Hostname string `json:"hostname"`
APIAddress string `json:"api_address"`
ClusterAddress string `json:"cluster_address"`
ActiveNode bool `json:"active_node"`
LastEcho *time.Time `json:"last_echo"`
}

type HaStatusResponse struct {
Nodes []Node
}
3 changes: 1 addition & 2 deletions http/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ func Handler(props *vault.HandlerProperties) http.Handler {
mux.Handle("/v1/sys/step-down", handleRequestForwarding(core, handleSysStepDown(core)))
mux.Handle("/v1/sys/unseal", handleSysUnseal(core))
mux.Handle("/v1/sys/leader", handleSysLeader(core))
mux.Handle("/v1/sys/ha-status", handleRequestForwarding(core, handleSysHaStatus(core)))
mux.Handle("/v1/sys/health", handleSysHealth(core))
mux.Handle("/v1/sys/monitor", handleLogicalNoForward(core))
mux.Handle("/v1/sys/generate-root/attempt", handleRequestForwarding(core,
Expand Down Expand Up @@ -213,7 +214,6 @@ func Handler(props *vault.HandlerProperties) http.Handler {
return printablePathCheckHandler
}


type copyResponseWriter struct {
wrapped http.ResponseWriter
statusCode int
Expand Down Expand Up @@ -304,7 +304,6 @@ func wrapGenericHandler(core *vault.Core, h http.Handler, props *vault.HandlerPr
hostname, _ := os.Hostname()

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {

// This block needs to be here so that upon sending SIGHUP, custom response
// headers are also reloaded into the handlers.
var customHeaders map[string][]*logical.CustomHeader
Expand Down
68 changes: 68 additions & 0 deletions http/sys_ha_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package http

import (
"net/http"
"time"

"github.com/hashicorp/errwrap"
"github.com/hashicorp/vault/vault"
"github.com/shirou/gopsutil/host"
)

func handleSysHaStatus(core *vault.Core) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
handleSysHaStatusGet(core, w, r)
default:
respondError(w, http.StatusMethodNotAllowed, nil)
}
})
}

func handleSysHaStatusGet(core *vault.Core, w http.ResponseWriter, r *http.Request) {
_, address, clusterAddr, err := core.Leader()
if errwrap.Contains(err, vault.ErrHANotEnabled.Error()) {
err = nil
}
if err != nil {
respondError(w, http.StatusInternalServerError, err)
return
}
h, _ := host.Info()
nodes := []Node{
{
Hostname: h.Hostname,
ncabatoff marked this conversation as resolved.
Show resolved Hide resolved
APIAddress: address,
ClusterAddress: clusterAddr,
ActiveNode: true,
},
}

for _, peerNode := range core.GetHAPeerNodesCached() {
lastEcho := peerNode.LastEcho
nodes = append(nodes, Node{
Hostname: peerNode.Hostname,
APIAddress: peerNode.APIAddress,
ClusterAddress: peerNode.ClusterAddress,
LastEcho: &lastEcho,
})
}
resp := &HaStatusResponse{
Nodes: nodes,
}

respondOk(w, resp)
}

type Node struct {
Hostname string `json:"hostname"`
APIAddress string `json:"api_address"`
ClusterAddress string `json:"cluster_address"`
ActiveNode bool `json:"active_node"`
LastEcho *time.Time `json:"last_echo"`
}

type HaStatusResponse struct {
Nodes []Node
}
63 changes: 63 additions & 0 deletions http/sys_ha_status_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package http

import (
"context"
"fmt"
"testing"
"time"

"github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/helper/testhelpers"
"github.com/hashicorp/vault/sdk/helper/logging"
"github.com/hashicorp/vault/sdk/physical"
"github.com/hashicorp/vault/sdk/physical/inmem"
"github.com/hashicorp/vault/vault"
)

func TestSysHAStatus(t *testing.T) {
logger := logging.NewVaultLogger(hclog.Trace)
inm, err := inmem.NewTransactionalInmem(nil, logger)
if err != nil {
t.Fatal(err)
}
inmha, err := inmem.NewInmemHA(nil, logger)
if err != nil {
t.Fatal(err)
}

conf := &vault.CoreConfig{
Physical: inm,
HAPhysical: inmha.(physical.HABackend),
}
opts := &vault.TestClusterOptions{
HandlerFunc: Handler,
}
cluster := vault.NewTestCluster(t, conf, opts)
cluster.Start()
defer cluster.Cleanup()
testhelpers.WaitForActiveNodeAndStandbys(t, cluster)

testhelpers.RetryUntil(t, 10*time.Second, func() error {
// Use standby deliberately to make sure it forwards
client := cluster.Cores[1].Client
r := client.NewRequest("GET", "/v1/sys/ha-status")
ctx, cancelFunc := context.WithCancel(context.Background())
defer cancelFunc()
resp, err := client.RawRequestWithContext(ctx, r)
if err != nil {
t.Fatal(err)
}
defer resp.Body.Close()

var result HaStatusResponse
err = resp.DecodeJSON(&result)
if err != nil {
t.Fatal(err)
}

if len(result.Nodes) != len(cluster.Cores) {
return fmt.Errorf("expected %d nodes, got %d", len(cluster.Cores), len(result.Nodes))
}
return nil
})
}
22 changes: 22 additions & 0 deletions vault/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -2946,3 +2946,25 @@ type LicenseState struct {
ExpiryTime time.Time
Terminated bool
}

type PeerNode struct {
Hostname string `json:"hostname"`
APIAddress string `json:"api_address"`
ClusterAddress string `json:"cluster_address"`
LastEcho time.Time `json:"last_echo"`
}
ncabatoff marked this conversation as resolved.
Show resolved Hide resolved

// GetHAPeerNodesCached returns the nodes that've sent us Echo requests recently.
func (c *Core) GetHAPeerNodesCached() []PeerNode {
var nodes []PeerNode
for itemClusterAddr, item := range c.clusterPeerClusterAddrsCache.Items() {
info := item.Object.(nodeHAConnectionInfo)
nodes = append(nodes, PeerNode{
Hostname: info.nodeInfo.NodeID,
APIAddress: info.nodeInfo.ApiAddr,
ClusterAddress: itemClusterAddr,
LastEcho: info.lastHeartbeat,
})
}
return nodes
}
22 changes: 19 additions & 3 deletions vault/request_forwarding_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/hashicorp/vault/helper/forwarding"
"github.com/hashicorp/vault/physical/raft"
"github.com/hashicorp/vault/vault/replication"
"github.com/shirou/gopsutil/host"
)

type forwardedRequestRPCServer struct {
Expand Down Expand Up @@ -71,9 +72,18 @@ func (s *forwardedRequestRPCServer) ForwardRequest(ctx context.Context, freq *fo
return resp, nil
}

type nodeHAConnectionInfo struct {
nodeInfo *NodeInformation
lastHeartbeat time.Time
}

func (s *forwardedRequestRPCServer) Echo(ctx context.Context, in *EchoRequest) (*EchoReply, error) {
incomingNodeConnectionInfo := nodeHAConnectionInfo{
nodeInfo: in.NodeInfo,
lastHeartbeat: time.Now(),
}
if in.ClusterAddr != "" {
s.core.clusterPeerClusterAddrsCache.Set(in.ClusterAddr, nil, 0)
s.core.clusterPeerClusterAddrsCache.Set(in.ClusterAddr, incomingNodeConnectionInfo, 0)
}

if in.RaftAppliedIndex > 0 && len(in.RaftNodeID) > 0 && s.raftFollowerStates != nil {
Expand Down Expand Up @@ -106,12 +116,18 @@ type forwardingClient struct {
// with these requests it's useful to keep this as well
func (c *forwardingClient) startHeartbeat() {
go func() {
clusterAddr := c.core.ClusterAddr()
h, _ := host.Info()
ni := NodeInformation{
ApiAddr: c.core.redirectAddr,
NodeID: h.Hostname,
Mode: "standby",
}
tick := func() {
clusterAddr := c.core.ClusterAddr()

req := &EchoRequest{
Message: "ping",
ClusterAddr: clusterAddr,
NodeInfo: &ni,
}

if raftBackend := c.core.getRaftBackend(); raftBackend != nil {
Expand Down
Loading