Skip to content

Commit

Permalink
OSS portions of raft non-voters (#7634)
Browse files Browse the repository at this point in the history
* OSS portions of raft non-voters

* add file

* Update vault/raft.go

Co-Authored-By: Vishal Nayak <[email protected]>
  • Loading branch information
briankassouf and vishalnayak authored Oct 11, 2019
1 parent aa61c2d commit 68750b7
Show file tree
Hide file tree
Showing 11 changed files with 80 additions and 35 deletions.
1 change: 1 addition & 0 deletions api/sys_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type RaftJoinRequest struct {
LeaderClientCert string `json:"leader_client_cert"`
LeaderClientKey string `json:"leader_client_key"`
Retry bool `json:"retry"`
NonVoter bool `json:"non_voter"`
}

// RaftJoin adds the node from which this call is invoked from to the raft
Expand Down
15 changes: 12 additions & 3 deletions command/operator_raft_join.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@ var _ cli.Command = (*OperatorRaftJoinCommand)(nil)
var _ cli.CommandAutocomplete = (*OperatorRaftJoinCommand)(nil)

type OperatorRaftJoinCommand struct {
flagRaftRetry bool
flagRetry bool
flagLeaderCACert string
flagLeaderClientCert string
flagLeaderClientKey string
flagNonVoter bool
*BaseCommand
}

Expand Down Expand Up @@ -66,11 +67,18 @@ func (c *OperatorRaftJoinCommand) Flags() *FlagSets {

f.BoolVar(&BoolVar{
Name: "retry",
Target: &c.flagRaftRetry,
Target: &c.flagRetry,
Default: false,
Usage: "Continuously retry joining the raft cluster upon failures.",
})

f.BoolVar(&BoolVar{
Name: "non-voter",
Target: &c.flagNonVoter,
Default: false,
Usage: "(Enterprise-only) This flag is used to make the server not participate in the Raft quorum, and have it only receive the data replication stream. This can be used to add read scalability to a cluster in cases where a high volume of reads to servers are needed.",
})

return set
}

Expand Down Expand Up @@ -117,7 +125,8 @@ func (c *OperatorRaftJoinCommand) Run(args []string) int {
LeaderCACert: c.flagLeaderCACert,
LeaderClientCert: c.flagLeaderClientCert,
LeaderClientKey: c.flagLeaderClientKey,
Retry: c.flagRaftRetry,
Retry: c.flagRetry,
NonVoter: c.flagNonVoter,
})
if err != nil {
c.UI.Error(fmt.Sprintf("Error joining the node to the raft cluster: %s", err))
Expand Down
4 changes: 2 additions & 2 deletions helper/testhelpers/testhelpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func RaftClusterJoinNodes(t testing.T, cluster *vault.TestCluster) {
{
core := cluster.Cores[1]
core.UnderlyingRawStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
_, err := core.JoinRaftCluster(namespace.RootContext(context.Background()), leaderAPI, leaderCore.TLSConfig, false)
_, err := core.JoinRaftCluster(namespace.RootContext(context.Background()), leaderAPI, leaderCore.TLSConfig, false, false)
if err != nil {
t.Fatal(err)
}
Expand All @@ -340,7 +340,7 @@ func RaftClusterJoinNodes(t testing.T, cluster *vault.TestCluster) {
{
core := cluster.Cores[2]
core.UnderlyingRawStorage.(*raft.RaftBackend).SetServerAddressProvider(addressProvider)
_, err := core.JoinRaftCluster(namespace.RootContext(context.Background()), leaderAPI, leaderCore.TLSConfig, false)
_, err := core.JoinRaftCluster(namespace.RootContext(context.Background()), leaderAPI, leaderCore.TLSConfig, false, false)
if err != nil {
t.Fatal(err)
}
Expand Down
8 changes: 7 additions & 1 deletion http/sys_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package http
import (
"context"
"crypto/tls"
"errors"
"io"
"net/http"

Expand All @@ -29,6 +30,10 @@ func handleSysRaftJoinPost(core *vault.Core, w http.ResponseWriter, r *http.Requ
return
}

if req.NonVoter && !nonVotersAllowed {
respondError(w, http.StatusBadRequest, errors.New("non-voting nodes not allowed"))
}

var tlsConfig *tls.Config
var err error
if len(req.LeaderCACert) != 0 || len(req.LeaderClientCert) != 0 || len(req.LeaderClientKey) != 0 {
Expand All @@ -39,7 +44,7 @@ func handleSysRaftJoinPost(core *vault.Core, w http.ResponseWriter, r *http.Requ
}
}

joined, err := core.JoinRaftCluster(context.Background(), req.LeaderAPIAddr, tlsConfig, req.Retry)
joined, err := core.JoinRaftCluster(context.Background(), req.LeaderAPIAddr, tlsConfig, req.Retry, req.NonVoter)
if err != nil {
respondError(w, http.StatusInternalServerError, err)
return
Expand All @@ -61,4 +66,5 @@ type JoinRequest struct {
LeaderClientCert string `json:"leader_client_cert"`
LeaderClientKey string `json:"leader_client_key"`
Retry bool `json:"retry"`
NonVoter bool `json:"non_voter"`
}
2 changes: 2 additions & 0 deletions http/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,6 @@ var (
}

additionalRoutes = func(mux *http.ServeMux, core *vault.Core) {}

nonVotersAllowed = false
)
1 change: 0 additions & 1 deletion physical/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,6 @@ func (b *RaftBackend) AddPeer(ctx context.Context, peerID, clusterAddr string) e
b.logger.Debug("adding raft peer", "node_id", peerID, "cluster_addr", clusterAddr)

future := b.raft.AddVoter(raft.ServerID(peerID), raft.ServerAddress(clusterAddr), 0, 0)

return future.Error()
}

Expand Down
13 changes: 13 additions & 0 deletions physical/raft/raft_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
// +build !enterprise

package raft

import (
"context"
"errors"
)

// AddPeer adds a new server to the raft cluster
func (b *RaftBackend) AddNonVotingPeer(ctx context.Context, peerID, clusterAddr string) error {
return errors.New("not implemented")
}
26 changes: 13 additions & 13 deletions vault/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,13 @@ type unlockInformation struct {
Nonce string
}

type raftInformation struct {
challenge *physical.EncryptedBlobInfo
leaderClient *api.Client
leaderBarrierConfig *SealConfig
nonVoter bool
}

// Core is used as the central manager of Vault activity. It is the primary point of
// interface for API handlers and is responsible for managing the logical and physical
// backends, router, security barrier, and audit trails.
Expand Down Expand Up @@ -189,13 +196,9 @@ type Core struct {
// seal is our seal, for seal configuration information
seal Seal

raftUnseal bool

raftChallenge *physical.EncryptedBlobInfo

raftLeaderClient *api.Client

raftLeaderBarrierConfig *SealConfig
// raftInfo will contain information required for this node to join as a
// peer to an existing raft cluster
raftInfo *raftInformation

// migrationSeal is the seal to use during a migration operation. It is the
// seal we're migrating *from*.
Expand Down Expand Up @@ -923,14 +926,11 @@ func (c *Core) unseal(key []byte, useRecoveryKeys bool) (bool, error) {

// If we are in the middle of a raft join send the answer and wait for
// data to start streaming in.
if err := c.joinRaftSendAnswer(ctx, c.raftLeaderClient, c.raftChallenge, c.seal.GetAccess()); err != nil {
if err := c.joinRaftSendAnswer(ctx, c.seal.GetAccess(), c.raftInfo); err != nil {
return false, err
}
// Reset the state
c.raftUnseal = false
c.raftChallenge = nil
c.raftLeaderBarrierConfig = nil
c.raftLeaderClient = nil
c.raftInfo = nil

go func() {
keyringFound := false
Expand Down Expand Up @@ -1002,7 +1002,7 @@ func (c *Core) unsealPart(ctx context.Context, seal Seal, key []byte, useRecover
case c.isRaftUnseal():
// Ignore follower's seal config and refer to leader's barrier
// configuration.
config = c.raftLeaderBarrierConfig
config = c.raftInfo.leaderBarrierConfig
default:
config, err = seal.BarrierConfig(ctx)
}
Expand Down
2 changes: 1 addition & 1 deletion vault/external_tests/raft/raft_test.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package vault
package rafttests

import (
"bytes"
Expand Down
14 changes: 13 additions & 1 deletion vault/logical_system_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@ func (b *SystemBackend) raftStoragePaths() []*framework.Path {
"cluster_addr": {
Type: framework.TypeString,
},
"non_voter": {
Type: framework.TypeBool,
},
},

Operations: map[logical.Operation]framework.OperationHandler{
Expand Down Expand Up @@ -233,6 +236,8 @@ func (b *SystemBackend) handleRaftBootstrapAnswerWrite() framework.OperationFunc
return logical.ErrorResponse("no cluster_addr provided"), logical.ErrInvalidRequest
}

nonVoter := d.Get("non_voter").(bool)

answer, err := base64.StdEncoding.DecodeString(answerRaw)
if err != nil {
return logical.ErrorResponse("could not base64 decode answer"), logical.ErrInvalidRequest
Expand Down Expand Up @@ -261,9 +266,16 @@ func (b *SystemBackend) handleRaftBootstrapAnswerWrite() framework.OperationFunc
return nil, errors.New("could not decode raft TLS configuration")
}

if err := raftStorage.AddPeer(ctx, serverID, clusterAddr); err != nil {
switch nonVoter {
case true:
err = raftStorage.AddNonVotingPeer(ctx, serverID, clusterAddr)
default:
err = raftStorage.AddPeer(ctx, serverID, clusterAddr)
}
if err != nil {
return nil, err
}

if b.Core.raftFollowerStates != nil {
b.Core.raftFollowerStates.update(serverID, 0)
}
Expand Down
29 changes: 16 additions & 13 deletions vault/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,7 @@ func (c *Core) raftSnapshotRestoreCallback(grabLock bool, sealNode bool) func(co
}
}

func (c *Core) JoinRaftCluster(ctx context.Context, leaderAddr string, tlsConfig *tls.Config, retry bool) (bool, error) {
func (c *Core) JoinRaftCluster(ctx context.Context, leaderAddr string, tlsConfig *tls.Config, retry, nonVoter bool) (bool, error) {
if len(leaderAddr) == 0 {
return false, errors.New("No leader address provided")
}
Expand Down Expand Up @@ -603,17 +603,19 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderAddr string, tlsConfig
if err := proto.Unmarshal(challengeRaw, eBlob); err != nil {
return errwrap.Wrapf("error decoding challenge: {{err}}", err)
}

raftInfo := &raftInformation{
challenge: eBlob,
leaderClient: apiClient,
leaderBarrierConfig: &sealConfig,
nonVoter: nonVoter,
}
if c.seal.BarrierType() == seal.Shamir {
c.raftUnseal = true
c.raftChallenge = eBlob
c.raftLeaderClient = apiClient
c.raftLeaderBarrierConfig = &sealConfig
c.raftInfo = raftInfo
c.seal.SetBarrierConfig(ctx, &sealConfig)
return nil
}

if err := c.joinRaftSendAnswer(ctx, apiClient, eBlob, c.seal.GetAccess()); err != nil {
if err := c.joinRaftSendAnswer(ctx, c.seal.GetAccess(), raftInfo); err != nil {
return errwrap.Wrapf("failed to send answer to leader node: {{err}}", err)
}

Expand Down Expand Up @@ -649,8 +651,8 @@ func (c *Core) JoinRaftCluster(ctx context.Context, leaderAddr string, tlsConfig
// This is used in tests to override the cluster address
var UpdateClusterAddrForTests uint32

func (c *Core) joinRaftSendAnswer(ctx context.Context, leaderClient *api.Client, challenge *physical.EncryptedBlobInfo, sealAccess seal.Access) error {
if challenge == nil {
func (c *Core) joinRaftSendAnswer(ctx context.Context, sealAccess seal.Access, raftInfo *raftInformation) error {
if raftInfo.challenge == nil {
return errors.New("raft challenge is nil")
}

Expand All @@ -663,7 +665,7 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, leaderClient *api.Client,
return errors.New("raft is already initialized")
}

plaintext, err := sealAccess.Decrypt(ctx, challenge)
plaintext, err := sealAccess.Decrypt(ctx, raftInfo.challenge)
if err != nil {
return errwrap.Wrapf("error decrypting challenge: {{err}}", err)
}
Expand All @@ -683,16 +685,17 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, leaderClient *api.Client,
}
}

answerReq := leaderClient.NewRequest("PUT", "/v1/sys/storage/raft/bootstrap/answer")
answerReq := raftInfo.leaderClient.NewRequest("PUT", "/v1/sys/storage/raft/bootstrap/answer")
if err := answerReq.SetJSONBody(map[string]interface{}{
"answer": base64.StdEncoding.EncodeToString(plaintext),
"cluster_addr": clusterAddr,
"server_id": raftStorage.NodeID(),
"non_voter": raftInfo.nonVoter,
}); err != nil {
return err
}

answerRespJson, err := leaderClient.RawRequestWithContext(ctx, answerReq)
answerRespJson, err := raftInfo.leaderClient.RawRequestWithContext(ctx, answerReq)
if answerRespJson != nil {
defer answerRespJson.Body.Close()
}
Expand Down Expand Up @@ -725,7 +728,7 @@ func (c *Core) joinRaftSendAnswer(ctx context.Context, leaderClient *api.Client,
}

func (c *Core) isRaftUnseal() bool {
return c.raftUnseal
return c.raftInfo != nil
}

type answerRespData struct {
Expand Down

0 comments on commit 68750b7

Please sign in to comment.