Skip to content

Commit

Permalink
agent/caching: support proxying request query parameters (#6772)
Browse files Browse the repository at this point in the history
* agent/caching: support proxying request query parameters

* update comment

* rejig other agent log messages to output method followed by path
  • Loading branch information
calvn authored and Brian Kassouf committed Jun 4, 2019
1 parent 32c9bdc commit 960845a
Show file tree
Hide file tree
Showing 5 changed files with 106 additions and 32 deletions.
7 changes: 6 additions & 1 deletion command/agent/cache/api_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,13 @@ func (ap *APIProxy) Send(ctx context.Context, req *SendRequest) (*SendResponse,
fwReq := client.NewRequest(req.Request.Method, req.Request.URL.Path)
fwReq.BodyBytes = req.RequestBody

query := req.Request.URL.Query()
if len(query) != 0 {
fwReq.Params = query
}

// Make the request to Vault and get the response
ap.logger.Info("forwarding request", "path", req.Request.URL.Path, "method", req.Request.Method)
ap.logger.Info("forwarding request", "method", req.Request.Method, "path", req.Request.URL.Path)

resp, err := client.RawRequestWithContext(ctx, fwReq)
if resp == nil && err != nil {
Expand Down
52 changes: 50 additions & 2 deletions command/agent/cache/api_proxy_test.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package cache

import (
"net/http"
"testing"

hclog "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/vault/api"
"github.com/hashicorp/vault/helper/jsonutil"
"github.com/hashicorp/vault/helper/logging"
Expand Down Expand Up @@ -42,6 +43,53 @@ func TestAPIProxy(t *testing.T) {
}

if !result.Initialized || result.Sealed || result.Standby {
t.Fatalf("bad sys/health response")
t.Fatalf("bad sys/health response: %#v", result)
}
}

func TestAPIProxy_queryParams(t *testing.T) {
// Set up an agent that points to a standby node for this particular test
// since it needs to proxy a /sys/health?standbyok=true request to a standby
cleanup, client, _, _ := setupClusterAndAgentOnStandby(namespace.RootContext(nil), t, nil)
defer cleanup()

proxier, err := NewAPIProxy(&APIProxyConfig{
Client: client,
Logger: logging.NewVaultLogger(hclog.Trace),
})
if err != nil {
t.Fatal(err)
}

r := client.NewRequest("GET", "/v1/sys/health")
req, err := r.ToHTTP()
if err != nil {
t.Fatal(err)
}

// Add a query parameter for testing
q := req.URL.Query()
q.Add("standbyok", "true")
req.URL.RawQuery = q.Encode()

resp, err := proxier.Send(namespace.RootContext(nil), &SendRequest{
Request: req,
})
if err != nil {
t.Fatal(err)
}

var result api.HealthResponse
err = jsonutil.DecodeJSONFromReader(resp.Response.Body, &result)
if err != nil {
t.Fatal(err)
}

if !result.Initialized || result.Sealed || !result.Standby {
t.Fatalf("bad sys/health response: %#v", result)
}

if resp.Response.StatusCode != http.StatusOK {
t.Fatalf("exptected standby to return 200, got: %v", resp.Response.StatusCode)
}
}
47 changes: 34 additions & 13 deletions command/agent/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,22 @@ path "*" {
`

// setupClusterAndAgent is a helper func used to set up a test cluster and
// caching agent. It returns a cleanup func that should be deferred immediately
// along with two clients, one for direct cluster communication and another to
// talk to the caching agent.
// caching agent against the active node. It returns a cleanup func that should
// be deferred immediately along with two clients, one for direct cluster
// communication and another to talk to the caching agent.
func setupClusterAndAgent(ctx context.Context, t *testing.T, coreConfig *vault.CoreConfig) (func(), *api.Client, *api.Client, *LeaseCache) {
return setupClusterAndAgentCommon(ctx, t, coreConfig, false)
}

// setupClusterAndAgentOnStandby is a helper func used to set up a test cluster
// and caching agent against a standby node. It returns a cleanup func that
// should be deferred immediately along with two clients, one for direct cluster
// communication and another to talk to the caching agent.
func setupClusterAndAgentOnStandby(ctx context.Context, t *testing.T, coreConfig *vault.CoreConfig) (func(), *api.Client, *api.Client, *LeaseCache) {
return setupClusterAndAgentCommon(ctx, t, coreConfig, true)
}

func setupClusterAndAgentCommon(ctx context.Context, t *testing.T, coreConfig *vault.CoreConfig, onStandby bool) (func(), *api.Client, *api.Client, *LeaseCache) {
t.Helper()

if ctx == nil {
Expand Down Expand Up @@ -70,21 +82,30 @@ func setupClusterAndAgent(ctx context.Context, t *testing.T, coreConfig *vault.C
cores := cluster.Cores
vault.TestWaitActive(t, cores[0].Core)

// clusterClient is the client that is used to talk directly to the cluster.
clusterClient := cores[0].Client
activeClient := cores[0].Client
standbyClient := cores[1].Client

// clienToUse is the client for the agent to point to.
clienToUse := activeClient
if onStandby {
clienToUse = standbyClient
}

// Add an admin policy
if err := clusterClient.Sys().PutPolicy("admin", policyAdmin); err != nil {
if err := activeClient.Sys().PutPolicy("admin", policyAdmin); err != nil {
t.Fatal(err)
}

// Set up the userpass auth backend and an admin user. Used for getting a token
// for the agent later down in this func.
clusterClient.Sys().EnableAuthWithOptions("userpass", &api.EnableAuthOptions{
err := activeClient.Sys().EnableAuthWithOptions("userpass", &api.EnableAuthOptions{
Type: "userpass",
})
if err != nil {
t.Fatal(err)
}

_, err := clusterClient.Logical().Write("auth/userpass/users/foo", map[string]interface{}{
_, err = activeClient.Logical().Write("auth/userpass/users/foo", map[string]interface{}{
"password": "bar",
"policies": []string{"admin"},
})
Expand All @@ -94,7 +115,7 @@ func setupClusterAndAgent(ctx context.Context, t *testing.T, coreConfig *vault.C

// Set up env vars for agent consumption
origEnvVaultAddress := os.Getenv(api.EnvVaultAddress)
os.Setenv(api.EnvVaultAddress, clusterClient.Address())
os.Setenv(api.EnvVaultAddress, clienToUse.Address())

origEnvVaultCACert := os.Getenv(api.EnvVaultCACert)
os.Setenv(api.EnvVaultCACert, fmt.Sprintf("%s/ca_cert.pem", cluster.TempDir))
Expand All @@ -108,7 +129,7 @@ func setupClusterAndAgent(ctx context.Context, t *testing.T, coreConfig *vault.C

// Create the API proxier
apiProxy, err := NewAPIProxy(&APIProxyConfig{
Client: clusterClient,
Client: clienToUse,
Logger: cacheLogger.Named("apiproxy"),
})
if err != nil {
Expand All @@ -118,7 +139,7 @@ func setupClusterAndAgent(ctx context.Context, t *testing.T, coreConfig *vault.C
// Create the lease cache proxier and set its underlying proxier to
// the API proxier.
leaseCache, err := NewLeaseCache(&LeaseCacheConfig{
Client: clusterClient,
Client: clienToUse,
BaseContext: ctx,
Proxier: apiProxy,
Logger: cacheLogger.Named("leasecache"),
Expand All @@ -142,7 +163,7 @@ func setupClusterAndAgent(ctx context.Context, t *testing.T, coreConfig *vault.C
go server.Serve(listener)

// testClient is the client that is used to talk to the agent for proxying/caching behavior.
testClient, err := clusterClient.Clone()
testClient, err := activeClient.Clone()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -171,7 +192,7 @@ func setupClusterAndAgent(ctx context.Context, t *testing.T, coreConfig *vault.C
listener.Close()
}

return cleanup, clusterClient, testClient, leaseCache
return cleanup, clienToUse, testClient, leaseCache
}

func tokenRevocationValidation(t *testing.T, sampleSpace map[string]string, expected map[string]string, leaseCache *LeaseCache) {
Expand Down
6 changes: 3 additions & 3 deletions command/agent/cache/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,11 @@ import (

func Handler(ctx context.Context, logger hclog.Logger, proxier Proxier, inmemSink sink.Sink) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
logger.Info("received request", "path", r.URL.Path, "method", r.Method)
logger.Info("received request", "method", r.Method, "path", r.URL.Path)

token := r.Header.Get(consts.AuthHeaderName)
if token == "" && inmemSink != nil {
logger.Debug("using auto auth token", "path", r.URL.Path, "method", r.Method)
logger.Debug("using auto auth token", "method", r.Method, "path", r.URL.Path)
token = inmemSink.(sink.SinkReader).Token()
}

Expand Down Expand Up @@ -148,7 +148,7 @@ func processTokenLookupResponse(ctx context.Context, logger hclog.Logger, inmemS
return nil
}

logger.Info("stripping auto-auth token from the response", "path", req.Request.URL.Path, "method", req.Request.Method)
logger.Info("stripping auto-auth token from the response", "method", req.Request.Method, "path", req.Request.URL.Path)
secret, err := api.ParseSecret(bytes.NewReader(resp.ResponseBody))
if err != nil {
return fmt.Errorf("failed to parse token lookup response: %v", err)
Expand Down
26 changes: 13 additions & 13 deletions command/agent/cache/lease_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,11 +202,11 @@ func (c *LeaseCache) Send(ctx context.Context, req *SendRequest) (*SendResponse,
// in between this upgrade so we can simply return that. Otherwise, this request
// will be the one performing the cache write.
if sendResp != nil {
c.logger.Debug("returning cached response", "path", req.Request.URL.Path)
c.logger.Debug("returning cached response", "method", req.Request.Method, "path", req.Request.URL.Path)
return sendResp, nil
}

c.logger.Debug("forwarding request", "path", req.Request.URL.Path, "method", req.Request.Method)
c.logger.Debug("forwarding request", "method", req.Request.Method, "path", req.Request.URL.Path)

// Pass the request down and get a response
resp, err := c.proxier.Send(ctx, req)
Expand Down Expand Up @@ -254,7 +254,7 @@ func (c *LeaseCache) Send(ctx context.Context, req *SendRequest) (*SendResponse,

// Fast path for responses with no secrets
if secret == nil {
c.logger.Debug("pass-through response; no secret in response", "path", req.Request.URL.Path, "method", req.Request.Method)
c.logger.Debug("pass-through response; no secret in response", "method", req.Request.Method, "path", req.Request.URL.Path)
return resp, nil
}

Expand All @@ -265,22 +265,22 @@ func (c *LeaseCache) Send(ctx context.Context, req *SendRequest) (*SendResponse,
return nil, err
}
if !secret.Renewable && !tokenRenewable {
c.logger.Debug("pass-through response; secret not renewable", "path", req.Request.URL.Path, "method", req.Request.Method)
c.logger.Debug("pass-through response; secret not renewable", "method", req.Request.Method, "path", req.Request.URL.Path)
return resp, nil
}

var renewCtxInfo *cachememdb.ContextInfo
switch {
case secret.LeaseID != "":
c.logger.Debug("processing lease response", "path", req.Request.URL.Path, "method", req.Request.Method)
c.logger.Debug("processing lease response", "method", req.Request.Method, "path", req.Request.URL.Path)
entry, err := c.db.Get(cachememdb.IndexNameToken, req.Token)
if err != nil {
return nil, err
}
// If the lease belongs to a token that is not managed by the agent,
// return the response without caching it.
if entry == nil {
c.logger.Debug("pass-through lease response; token not managed by agent", "path", req.Request.URL.Path, "method", req.Request.Method)
c.logger.Debug("pass-through lease response; token not managed by agent", "method", req.Request.Method, "path", req.Request.URL.Path)
return resp, nil
}

Expand All @@ -291,7 +291,7 @@ func (c *LeaseCache) Send(ctx context.Context, req *SendRequest) (*SendResponse,
index.LeaseToken = req.Token

case secret.Auth != nil:
c.logger.Debug("processing auth response", "path", req.Request.URL.Path, "method", req.Request.Method)
c.logger.Debug("processing auth response", "method", req.Request.Method, "path", req.Request.URL.Path)

// Check if this token creation request resulted in a non-orphan token, and if so
// correctly set the parentCtx to the request's token context.
Expand All @@ -304,11 +304,11 @@ func (c *LeaseCache) Send(ctx context.Context, req *SendRequest) (*SendResponse,
// If parent token is not managed by the agent, child shouldn't be
// either.
if entry == nil {
c.logger.Debug("pass-through auth response; parent token not managed by agent", "path", req.Request.URL.Path, "method", req.Request.Method)
c.logger.Debug("pass-through auth response; parent token not managed by agent", "method", req.Request.Method, "path", req.Request.URL.Path)
return resp, nil
}

c.logger.Debug("setting parent context", "path", req.Request.URL.Path, "method", req.Request.Method)
c.logger.Debug("setting parent context", "method", req.Request.Method, "path", req.Request.URL.Path)
parentCtx = entry.RenewCtxInfo.Ctx

entry.TokenParent = req.Token
Expand All @@ -321,7 +321,7 @@ func (c *LeaseCache) Send(ctx context.Context, req *SendRequest) (*SendResponse,
default:
// We shouldn't be hitting this, but will err on the side of caution and
// simply proxy.
c.logger.Debug("pass-through response; secret without lease and token", "path", req.Request.URL.Path, "method", req.Request.Method)
c.logger.Debug("pass-through response; secret without lease and token", "method", req.Request.Method, "path", req.Request.URL.Path)
return resp, nil
}

Expand Down Expand Up @@ -353,7 +353,7 @@ func (c *LeaseCache) Send(ctx context.Context, req *SendRequest) (*SendResponse,
}

// Store the index in the cache
c.logger.Debug("storing response into the cache", "path", req.Request.URL.Path, "method", req.Request.Method)
c.logger.Debug("storing response into the cache", "method", req.Request.Method, "path", req.Request.URL.Path)
err = c.db.Set(index)
if err != nil {
c.logger.Error("failed to cache the proxied response", "error", err)
Expand All @@ -378,7 +378,7 @@ func (c *LeaseCache) createCtxInfo(ctx context.Context) *cachememdb.ContextInfo
func (c *LeaseCache) startRenewing(ctx context.Context, index *cachememdb.Index, req *SendRequest, secret *api.Secret) {
defer func() {
id := ctx.Value(contextIndexID).(string)
c.logger.Debug("evicting index from cache", "id", id, "path", req.Request.URL.Path, "method", req.Request.Method)
c.logger.Debug("evicting index from cache", "id", id, "method", req.Request.Method, "path", req.Request.URL.Path)
err := c.db.Evict(cachememdb.IndexNameID, id)
if err != nil {
c.logger.Error("failed to evict index", "id", id, "error", err)
Expand All @@ -402,7 +402,7 @@ func (c *LeaseCache) startRenewing(ctx context.Context, index *cachememdb.Index,
return
}

c.logger.Debug("initiating renewal", "path", req.Request.URL.Path, "method", req.Request.Method)
c.logger.Debug("initiating renewal", "method", req.Request.Method, "path", req.Request.URL.Path)
go renewer.Renew()
defer renewer.Stop()

Expand Down

0 comments on commit 960845a

Please sign in to comment.