From 3301b8cdc3334a5d39d07f6e058d0424faa2b8d8 Mon Sep 17 00:00:00 2001 From: Derek Menteer Date: Mon, 8 May 2023 11:22:19 -0500 Subject: [PATCH] Fix multiple issues related to proxycfg health queries. 1. The datacenter was not being provided to a proxycfg query, which resulted in bypassing agentless query optimizations and using the normal API instead. 2. The health rpc endpoint would return a zero index when insufficient ACLs were detected. This would result in the agent cache performing an infinite loop of queries in rapid succession without backoff. --- agent/consul/health_endpoint.go | 42 +++++++++++++++------------- agent/consul/health_endpoint_test.go | 1 + agent/proxycfg/state_test.go | 2 +- agent/proxycfg/upstreams.go | 24 ++++++++-------- 4 files changed, 37 insertions(+), 32 deletions(-) diff --git a/agent/consul/health_endpoint.go b/agent/consul/health_endpoint.go index abf17adbaeb10..a92d479be7287 100644 --- a/agent/consul/health_endpoint.go +++ b/agent/consul/health_endpoint.go @@ -214,6 +214,16 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc f = h.serviceNodesDefault } + filter, err := bexpr.CreateFilter(args.Filter, nil, reply.Nodes) + if err != nil { + return err + } + + var ( + priorMergeHash uint64 + ranMergeOnce bool + ) + authzContext := acl.AuthorizerContext{ Peer: args.PeerName, } @@ -226,26 +236,6 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc return err } - // If we're doing a connect or ingress query, we need read access to the service - // we're trying to find proxies for, so check that. - if args.Connect || args.Ingress { - // TODO(acl-error-enhancements) Look for ways to percolate this information up to give any feedback to the user. - if authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow { - // Just return nil, which will return an empty response (tested) - return nil - } - } - - filter, err := bexpr.CreateFilter(args.Filter, nil, reply.Nodes) - if err != nil { - return err - } - - var ( - priorMergeHash uint64 - ranMergeOnce bool - ) - err = h.srv.blockingQuery( &args.QueryOptions, &reply.QueryMeta, @@ -257,6 +247,18 @@ func (h *Health) ServiceNodes(args *structs.ServiceSpecificRequest, reply *struc return err } + // If we're doing a connect or ingress query, we need read access to the service + // we're trying to find proxies for, so check that. + if args.Connect || args.Ingress { + // TODO(acl-error-enhancements) Look for ways to percolate this information up to give any feedback to the user. + if authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow { + // Just return nil, which will return an empty response (tested) + // We should be careful to set the index to prevent a busy loop from triggering. + reply.Index = index + return nil + } + } + resolvedNodes := nodes if args.MergeCentralConfig { for _, node := range resolvedNodes { diff --git a/agent/consul/health_endpoint_test.go b/agent/consul/health_endpoint_test.go index 4b492081c9307..1981332d86b44 100644 --- a/agent/consul/health_endpoint_test.go +++ b/agent/consul/health_endpoint_test.go @@ -1127,6 +1127,7 @@ node "foo" { var resp structs.IndexedCheckServiceNodes assert.Nil(t, msgpackrpc.CallWithCodec(codec, "Health.ServiceNodes", &req, &resp)) assert.Len(t, resp.Nodes, 0) + assert.Greater(t, resp.Index, uint64(0)) // List w/ token. This should work since we're requesting "foo", but should // also only contain the proxies with names that adhere to our ACL. diff --git a/agent/proxycfg/state_test.go b/agent/proxycfg/state_test.go index aa11dc43f5420..792367b473be7 100644 --- a/agent/proxycfg/state_test.go +++ b/agent/proxycfg/state_test.go @@ -795,7 +795,7 @@ func TestState_WatchesAndUpdates(t *testing.T) { fmt.Sprintf("upstream-target:api-failover-remote.default.default.dc2:%s-failover-remote?dc=dc2", apiUID.String()): genVerifyServiceSpecificRequest("api-failover-remote", "", "dc2", true), fmt.Sprintf("upstream-target:api-failover-local.default.default.dc2:%s-failover-local?dc=dc2", apiUID.String()): genVerifyServiceSpecificRequest("api-failover-local", "", "dc2", true), fmt.Sprintf("upstream-target:api-failover-direct.default.default.dc2:%s-failover-direct?dc=dc2", apiUID.String()): genVerifyServiceSpecificRequest("api-failover-direct", "", "dc2", true), - upstreamPeerWatchIDPrefix + fmt.Sprintf("%s-failover-to-peer?peer=cluster-01", apiUID.String()): genVerifyServiceSpecificPeeredRequest("api-failover-to-peer", "", "", "cluster-01", true), + upstreamPeerWatchIDPrefix + fmt.Sprintf("%s-failover-to-peer?peer=cluster-01", apiUID.String()): genVerifyServiceSpecificPeeredRequest("api-failover-to-peer", "", "dc1", "cluster-01", true), fmt.Sprintf("mesh-gateway:dc2:%s-failover-remote?dc=dc2", apiUID.String()): genVerifyGatewayWatch("dc2"), fmt.Sprintf("mesh-gateway:dc1:%s-failover-local?dc=dc2", apiUID.String()): genVerifyGatewayWatch("dc1"), }, diff --git a/agent/proxycfg/upstreams.go b/agent/proxycfg/upstreams.go index d7fee4a3fcc16..5e42072fac326 100644 --- a/agent/proxycfg/upstreams.go +++ b/agent/proxycfg/upstreams.go @@ -316,8 +316,19 @@ func (s *handlerUpstreams) resetWatchesFromChain( watchedChainEndpoints = true } - opts := targetWatchOpts{upstreamID: uid} - opts.fromChainTarget(target) + opts := targetWatchOpts{ + upstreamID: uid, + chainID: target.ID, + service: target.Service, + filter: target.Subset.Filter, + datacenter: target.Datacenter, + peer: target.Peer, + entMeta: target.GetEnterpriseMetadata(), + } + // Peering targets do not set the datacenter field, so we should default it here. + if opts.datacenter == "" { + opts.datacenter = s.source.Datacenter + } err := s.watchUpstreamTarget(ctx, snap, opts) if err != nil { @@ -435,15 +446,6 @@ type targetWatchOpts struct { entMeta *acl.EnterpriseMeta } -func (o *targetWatchOpts) fromChainTarget(t *structs.DiscoveryTarget) { - o.chainID = t.ID - o.service = t.Service - o.filter = t.Subset.Filter - o.datacenter = t.Datacenter - o.peer = t.Peer - o.entMeta = t.GetEnterpriseMetadata() -} - func (s *handlerUpstreams) watchUpstreamTarget(ctx context.Context, snap *ConfigSnapshotUpstreams, opts targetWatchOpts) error { s.logger.Trace("initializing watch of target", "upstream", opts.upstreamID,