Skip to content

Commit

Permalink
Fix multiple issues related to proxycfg health queries.
Browse files Browse the repository at this point in the history
1. The datacenter was not being provided to a proxycfg query, which resulted in
bypassing agentless query optimizations and using the normal API instead.

2. The health rpc endpoint would return a zero index when insufficient ACLs were
detected. This would result in the agent cache performing an infinite loop of
queries in rapid succession without backoff.
  • Loading branch information
hashi-derek committed May 8, 2023
1 parent 166d7a3 commit 3301b8c
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 32 deletions.
42 changes: 22 additions & 20 deletions agent/consul/health_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,16 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
f = h.serviceNodesDefault
}

filter, err := bexpr.CreateFilter(args.Filter, nil, reply.Nodes)
if err != nil {
return err
}

var (
priorMergeHash uint64
ranMergeOnce bool
)

authzContext := acl.AuthorizerContext{
Peer: args.PeerName,
}
Expand All @@ -226,26 +236,6 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
return err
}

// If we're doing a connect or ingress query, we need read access to the service
// we're trying to find proxies for, so check that.
if args.Connect || args.Ingress {
// TODO(acl-error-enhancements) Look for ways to percolate this information up to give any feedback to the user.
if authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow {
// Just return nil, which will return an empty response (tested)
return nil
}
}

filter, err := bexpr.CreateFilter(args.Filter, nil, reply.Nodes)
if err != nil {
return err
}

var (
priorMergeHash uint64
ranMergeOnce bool
)

err = h.srv.blockingQuery(
&args.QueryOptions,
&reply.QueryMeta,
Expand All @@ -257,6 +247,18 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc
return err
}

// If we're doing a connect or ingress query, we need read access to the service
// we're trying to find proxies for, so check that.
if args.Connect || args.Ingress {
// TODO(acl-error-enhancements) Look for ways to percolate this information up to give any feedback to the user.
if authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow {
// Just return nil, which will return an empty response (tested)
// We should be careful to set the index to prevent a busy loop from triggering.
reply.Index = index
return nil
}
}

resolvedNodes := nodes
if args.MergeCentralConfig {
for _, node := range resolvedNodes {
Expand Down
1 change: 1 addition & 0 deletions agent/consul/health_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1127,6 +1127,7 @@ node "foo" {
var resp structs.IndexedCheckServiceNodes
assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &resp))
assert.Len(t, resp.Nodes, 0)
assert.Greater(t, resp.Index, uint64(0))

// List w/ token. This should work since we're requesting "foo", but should
// also only contain the proxies with names that adhere to our ACL.
Expand Down
2 changes: 1 addition & 1 deletion agent/proxycfg/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,7 +795,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
fmt.Sprintf("upstream-target:api-failover-remote.default.default.dc2:%s-failover-remote?dc=dc2", apiUID.String()): genVerifyServiceSpecificRequest("api-failover-remote", "", "dc2", true),
fmt.Sprintf("upstream-target:api-failover-local.default.default.dc2:%s-failover-local?dc=dc2", apiUID.String()): genVerifyServiceSpecificRequest("api-failover-local", "", "dc2", true),
fmt.Sprintf("upstream-target:api-failover-direct.default.default.dc2:%s-failover-direct?dc=dc2", apiUID.String()): genVerifyServiceSpecificRequest("api-failover-direct", "", "dc2", true),
upstreamPeerWatchIDPrefix + fmt.Sprintf("%s-failover-to-peer?peer=cluster-01", apiUID.String()): genVerifyServiceSpecificPeeredRequest("api-failover-to-peer", "", "", "cluster-01", true),
upstreamPeerWatchIDPrefix + fmt.Sprintf("%s-failover-to-peer?peer=cluster-01", apiUID.String()): genVerifyServiceSpecificPeeredRequest("api-failover-to-peer", "", "dc1", "cluster-01", true),
fmt.Sprintf("mesh-gateway:dc2:%s-failover-remote?dc=dc2", apiUID.String()): genVerifyGatewayWatch("dc2"),
fmt.Sprintf("mesh-gateway:dc1:%s-failover-local?dc=dc2", apiUID.String()): genVerifyGatewayWatch("dc1"),
},
Expand Down
24 changes: 13 additions & 11 deletions agent/proxycfg/upstreams.go
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,19 @@ func (s *handlerUpstreams) resetWatchesFromChain(
watchedChainEndpoints = true
}

opts := targetWatchOpts{upstreamID: uid}
opts.fromChainTarget(target)
opts := targetWatchOpts{
upstreamID: uid,
chainID: target.ID,
service: target.Service,
filter: target.Subset.Filter,
datacenter: target.Datacenter,
peer: target.Peer,
entMeta: target.GetEnterpriseMetadata(),
}
// Peering targets do not set the datacenter field, so we should default it here.
if opts.datacenter == "" {
opts.datacenter = s.source.Datacenter
}

err := s.watchUpstreamTarget(ctx, snap, opts)
if err != nil {
Expand Down Expand Up @@ -435,15 +446,6 @@ type targetWatchOpts struct {
entMeta *acl.EnterpriseMeta
}

func (o *targetWatchOpts) fromChainTarget(t *structs.DiscoveryTarget) {
o.chainID = t.ID
o.service = t.Service
o.filter = t.Subset.Filter
o.datacenter = t.Datacenter
o.peer = t.Peer
o.entMeta = t.GetEnterpriseMetadata()
}

func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *ConfigSnapshotUpstreams, opts targetWatchOpts) error {
s.logger.Trace("initializing watch of target",
"upstream", opts.upstreamID,
Expand Down

0 comments on commit 3301b8c

Please sign in to comment.