diff --git a/helper/testhelpers/testhelpers.go b/helper/testhelpers/testhelpers.go index 61d2a219ab8b..3f95f25bac60 100644 --- a/helper/testhelpers/testhelpers.go +++ b/helper/testhelpers/testhelpers.go @@ -453,3 +453,18 @@ func WaitForNCoresSealed(t testing.T, cluster *vault.TestCluster, n int) { t.Fatalf("%d cores were not sealed", n) } + +func WaitForActiveNode(t testing.T, cluster *vault.TestCluster) *vault.TestClusterCore { + for i := 0; i < 10; i++ { + for _, core := range cluster.Cores { + if standby, _ := core.Core.Standby(); !standby { + return core + } + } + + time.Sleep(time.Second) + } + + t.Fatalf("node did not become active") + return nil +} diff --git a/vault/cluster.go b/vault/cluster.go index 00445f825586..4ea6a446b39f 100644 --- a/vault/cluster.go +++ b/vault/cluster.go @@ -293,7 +293,7 @@ type ClusterHandler interface { CALookup(context.Context) (*x509.Certificate, error) // Handoff is used to pass the connection lifetime off to - // the storage backend + // the handler Handoff(context.Context, *sync.WaitGroup, chan struct{}, *tls.Conn) error Stop() error } @@ -366,6 +366,7 @@ func (cl *ClusterListener) TLSConfig(ctx context.Context) (*tls.Config, error) { } } + cl.logger.Warn("no TLS certs found for ALPN", "ALPN", clientHello.SupportedProtos) return nil, errors.New("unsupported protocol") } @@ -381,6 +382,7 @@ func (cl *ClusterListener) TLSConfig(ctx context.Context) (*tls.Config, error) { } } + cl.logger.Warn("no client information found") return nil, errors.New("no client cert found") } @@ -412,6 +414,7 @@ func (cl *ClusterListener) TLSConfig(ctx context.Context) (*tls.Config, error) { } } + cl.logger.Warn("no TLS config found for ALPN", "ALPN", clientHello.SupportedProtos) return nil, errors.New("unsupported protocol") } diff --git a/vault/init.go b/vault/init.go index c4ad07ecf028..b996a1d459c7 100644 --- a/vault/init.go +++ b/vault/init.go @@ -6,6 +6,7 @@ import ( "encoding/hex" "errors" "fmt" + "sync/atomic" "github.com/hashicorp/errwrap" "github.com/hashicorp/vault/helper/namespace" @@ -30,7 +31,8 @@ type InitResult struct { } var ( - initPTFunc = func(c *Core) func() { return nil } + initPTFunc = func(c *Core) func() { return nil } + initInProgress uint32 ) // Initialized checks if the Vault is already initialized @@ -97,6 +99,8 @@ func (c *Core) generateShares(sc *SealConfig) ([]byte, [][]byte, error) { // Initialize is used to initialize the Vault with the given // configurations. func (c *Core) Initialize(ctx context.Context, initParams *InitParams) (*InitResult, error) { + atomic.StoreUint32(&initInProgress, 1) + defer atomic.StoreUint32(&initInProgress, 0) barrierConfig := initParams.BarrierConfig recoveryConfig := initParams.RecoveryConfig diff --git a/vault/request_forwarding.go b/vault/request_forwarding.go index ad8c6d42fccf..da743cf4d035 100644 --- a/vault/request_forwarding.go +++ b/vault/request_forwarding.go @@ -189,6 +189,8 @@ func (rf *requestForwardingHandler) Handoff(ctx context.Context, shutdownWg *syn // Stop stops the request forwarding server and closes connections. func (rf *requestForwardingHandler) Stop() error { + // Give some time for existing RPCs to drain. + time.Sleep(clusterListenerAcceptDeadline) close(rf.stopCh) rf.fwRPCServer.Stop() return nil diff --git a/vault/request_forwarding_rpc.go b/vault/request_forwarding_rpc.go index 24adfac663fd..9a27adde90ae 100644 --- a/vault/request_forwarding_rpc.go +++ b/vault/request_forwarding_rpc.go @@ -61,7 +61,9 @@ func (s *forwardedRequestRPCServer) ForwardRequest(ctx context.Context, freq *fo } } - resp.LastRemoteWal = LastRemoteWAL(s.core) + // Performance standby nodes will use this value to do wait for WALs to ship + // in order to do a best-effort read after write gurantee + resp.LastRemoteWal = LastWAL(s.core) return resp, nil }