-
Notifications
You must be signed in to change notification settings - Fork 4.4k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Fix issue with streaming service health watches. (#17775)
Fix issue with streaming service health watches. This commit fixes an issue where the health streams were unaware of service export changes. Whenever an exported-services config entry is modified, it is effectively an ACL change. The bug would be triggered by the following situation: - no services are exported - an upstream watch to service X is spawned - the streaming backend filters out data for service X (due to lack of exports) - service X is finally exported In the situation above, the streaming backend does not trigger a refresh of its data. This means that any events that were supposed to have been received prior to the export are NOT backfilled, and the watches never see service X spawning. We currently have decided to not trigger a stream refresh in this situation due to the potential for a thundering herd effect (touching exports would cause a re-fetch of all watches for that partition, potentially). Therefore, a local blocking-query approach was added by this commit for agentless. It's also worth noting that the streaming subscription is currently bypassed most of the time with agentful, because proxycfg has a `req.Source.Node != ""` which prevents the `streamingEnabled` check from passing. This means that while agents should technically have this same issue, they don't experience it with mesh health watches. Note that this is a temporary fix that solves the issue for proxycfg, but not service-discovery use cases.
- Loading branch information
1 parent
c7d9075
commit 22ce263
Showing
5 changed files
with
366 additions
and
4 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
```release-note:bug | ||
connect: Fix issue where changes to service exports were not reflected in proxies. | ||
``` |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,164 @@ | ||
// Copyright (c) HashiCorp, Inc. | ||
// SPDX-License-Identifier: MPL-2.0 | ||
|
||
package proxycfgglue | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"time" | ||
|
||
"github.com/hashicorp/go-bexpr" | ||
"github.com/hashicorp/go-memdb" | ||
|
||
"github.com/hashicorp/consul/acl" | ||
"github.com/hashicorp/consul/agent/consul/state" | ||
"github.com/hashicorp/consul/agent/consul/watch" | ||
"github.com/hashicorp/consul/agent/proxycfg" | ||
"github.com/hashicorp/consul/agent/structs" | ||
"github.com/hashicorp/consul/agent/structs/aclfilter" | ||
) | ||
|
||
// ServerHealthBlocking exists due to a bug with the streaming backend and its interaction with ACLs. | ||
// Whenever an exported-services config entry is modified, this is effectively an ACL change. | ||
// Assume the following situation: | ||
// - no services are exported | ||
// - an upstream watch to service X is spawned | ||
// - the streaming backend filters out data for service X (because it's not exported yet) | ||
// - service X is finally exported | ||
// | ||
// In this situation, the streaming backend does not trigger a refresh of its data. | ||
// This means that any events that were supposed to have been received prior to the export are NOT backfilled, | ||
// and the watches never see service X spawning. | ||
// | ||
// We currently have decided to not trigger a stream refresh in this situation due to the potential for a | ||
// thundering herd effect (touching exports would cause a re-fetch of all watches for that partition, potentially). | ||
// Therefore, this local blocking-query approach exists for agentless. | ||
// | ||
// It's also worth noting that the streaming subscription is currently bypassed most of the time with agentful, | ||
// because proxycfg has a `req.Source.Node != ""` which prevents the `streamingEnabled` check from passing. | ||
// This means that while agents should technically have this same issue, they don't experience it with mesh health | ||
// watches. | ||
func ServerHealthBlocking(deps ServerDataSourceDeps, remoteSource proxycfg.Health, state *state.Store) *serverHealthBlocking { | ||
return &serverHealthBlocking{deps, remoteSource, state, 5 * time.Minute} | ||
} | ||
|
||
type serverHealthBlocking struct { | ||
deps ServerDataSourceDeps | ||
remoteSource proxycfg.Health | ||
state *state.Store | ||
watchTimeout time.Duration | ||
} | ||
|
||
// Notify is mostly a copy of the function in `agent/consul/health_endpoint.go` with a few minor tweaks. | ||
// Most notably, some query features unnecessary for mesh have been stripped out. | ||
func (h *serverHealthBlocking) Notify(ctx context.Context, args *structs.ServiceSpecificRequest, correlationID string, ch chan<- proxycfg.UpdateEvent) error { | ||
if args.Datacenter != h.deps.Datacenter { | ||
return h.remoteSource.Notify(ctx, args, correlationID, ch) | ||
} | ||
|
||
// Verify the arguments | ||
if args.ServiceName == "" { | ||
return fmt.Errorf("Must provide service name") | ||
} | ||
if args.EnterpriseMeta.PartitionOrDefault() == acl.WildcardName { | ||
return fmt.Errorf("Wildcards are not allowed in the partition field") | ||
} | ||
|
||
// Determine the function we'll call | ||
var f func(memdb.WatchSet, *state.Store, *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) | ||
switch { | ||
case args.Connect: | ||
f = serviceNodesConnect | ||
case args.Ingress: | ||
f = serviceNodesIngress | ||
default: | ||
f = serviceNodesDefault | ||
} | ||
|
||
filter, err := bexpr.CreateFilter(args.Filter, nil, structs.CheckServiceNode{}) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
var hadResults bool = false | ||
return watch.ServerLocalNotify(ctx, correlationID, h.deps.GetStore, | ||
func(ws memdb.WatchSet, store Store) (uint64, *structs.IndexedCheckServiceNodes, error) { | ||
// This is necessary so that service export changes are eventually picked up, since | ||
// they won't trigger the watch themselves. | ||
timeoutCh := make(chan struct{}) | ||
time.AfterFunc(h.watchTimeout, func() { | ||
close(timeoutCh) | ||
}) | ||
ws.Add(timeoutCh) | ||
|
||
authzContext := acl.AuthorizerContext{ | ||
Peer: args.PeerName, | ||
} | ||
authz, err := h.deps.ACLResolver.ResolveTokenAndDefaultMeta(args.Token, &args.EnterpriseMeta, &authzContext) | ||
if err != nil { | ||
return 0, nil, err | ||
} | ||
// If we're doing a connect or ingress query, we need read access to the service | ||
// we're trying to find proxies for, so check that. | ||
if args.Connect || args.Ingress { | ||
if authz.ServiceRead(args.ServiceName, &authzContext) != acl.Allow { | ||
// If access was somehow revoked (via token deletion or unexporting), then we clear the | ||
// last-known results before triggering an error. This way, the proxies will actually update | ||
// their data, rather than holding onto the last-known list of healthy nodes indefinitely. | ||
if hadResults { | ||
hadResults = false | ||
return 0, &structs.IndexedCheckServiceNodes{}, watch.ErrorACLResetData | ||
} | ||
return 0, nil, acl.ErrPermissionDenied | ||
} | ||
} | ||
|
||
var thisReply structs.IndexedCheckServiceNodes | ||
thisReply.Index, thisReply.Nodes, err = f(ws, h.state, args) | ||
if err != nil { | ||
return 0, nil, err | ||
} | ||
|
||
raw, err := filter.Execute(thisReply.Nodes) | ||
if err != nil { | ||
return 0, nil, err | ||
} | ||
thisReply.Nodes = raw.(structs.CheckServiceNodes) | ||
|
||
// Note: we filter the results with ACLs *after* applying the user-supplied | ||
// bexpr filter, to ensure QueryMeta.ResultsFilteredByACLs does not include | ||
// results that would be filtered out even if the user did have permission. | ||
if err := h.filterACL(&authzContext, args.Token, &thisReply); err != nil { | ||
return 0, nil, err | ||
} | ||
|
||
hadResults = true | ||
return thisReply.Index, &thisReply, nil | ||
}, | ||
dispatchBlockingQueryUpdate[*structs.IndexedCheckServiceNodes](ch), | ||
) | ||
} | ||
|
||
func (h *serverHealthBlocking) filterACL(authz *acl.AuthorizerContext, token string, subj *structs.IndexedCheckServiceNodes) error { | ||
// Get the ACL from the token | ||
var entMeta acl.EnterpriseMeta | ||
authorizer, err := h.deps.ACLResolver.ResolveTokenAndDefaultMeta(token, &entMeta, authz) | ||
if err != nil { | ||
return err | ||
} | ||
aclfilter.New(authorizer, h.deps.Logger).Filter(subj) | ||
return nil | ||
} | ||
|
||
func serviceNodesConnect(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) { | ||
return s.CheckConnectServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName) | ||
} | ||
|
||
func serviceNodesIngress(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) { | ||
return s.CheckIngressServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta) | ||
} | ||
|
||
func serviceNodesDefault(ws memdb.WatchSet, s *state.Store, args *structs.ServiceSpecificRequest) (uint64, structs.CheckServiceNodes, error) { | ||
return s.CheckServiceNodes(ws, args.ServiceName, &args.EnterpriseMeta, args.PeerName) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,183 @@ | ||
package proxycfgglue | ||
|
||
import ( | ||
"context" | ||
"errors" | ||
"testing" | ||
"time" | ||
|
||
"github.com/hashicorp/consul/acl" | ||
"github.com/hashicorp/consul/agent/consul/state" | ||
"github.com/hashicorp/consul/agent/proxycfg" | ||
"github.com/hashicorp/consul/agent/structs" | ||
"github.com/hashicorp/consul/sdk/testutil" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestServerHealthBlocking(t *testing.T) { | ||
t.Run("remote queries are delegated to the remote source", func(t *testing.T) { | ||
var ( | ||
ctx = context.Background() | ||
req = &structs.ServiceSpecificRequest{Datacenter: "dc2"} | ||
correlationID = "correlation-id" | ||
ch = make(chan<- proxycfg.UpdateEvent) | ||
result = errors.New("KABOOM") | ||
) | ||
|
||
remoteSource := newMockHealth(t) | ||
remoteSource.On("Notify", ctx, req, correlationID, ch).Return(result) | ||
|
||
store := state.NewStateStore(nil) | ||
dataSource := ServerHealthBlocking(ServerDataSourceDeps{Datacenter: "dc1"}, remoteSource, store) | ||
err := dataSource.Notify(ctx, req, correlationID, ch) | ||
require.Equal(t, result, err) | ||
}) | ||
|
||
t.Run("services notify correctly", func(t *testing.T) { | ||
const ( | ||
datacenter = "dc1" | ||
serviceName = "web" | ||
) | ||
|
||
ctx, cancel := context.WithCancel(context.Background()) | ||
t.Cleanup(cancel) | ||
|
||
store := state.NewStateStore(nil) | ||
aclResolver := newStaticResolver(acl.ManageAll()) | ||
dataSource := ServerHealthBlocking(ServerDataSourceDeps{ | ||
GetStore: func() Store { return store }, | ||
Datacenter: datacenter, | ||
ACLResolver: aclResolver, | ||
Logger: testutil.Logger(t), | ||
}, nil, store) | ||
dataSource.watchTimeout = 1 * time.Second | ||
|
||
// Watch for all events | ||
eventCh := make(chan proxycfg.UpdateEvent) | ||
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{ | ||
Datacenter: datacenter, | ||
ServiceName: serviceName, | ||
}, "", eventCh)) | ||
|
||
// Watch for a subset of events | ||
filteredCh := make(chan proxycfg.UpdateEvent) | ||
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{ | ||
Datacenter: datacenter, | ||
ServiceName: serviceName, | ||
QueryOptions: structs.QueryOptions{ | ||
Filter: "Service.ID == \"web1\"", | ||
}, | ||
}, "", filteredCh)) | ||
|
||
testutil.RunStep(t, "initial state", func(t *testing.T) { | ||
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh) | ||
require.Empty(t, result.Nodes) | ||
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh) | ||
require.Empty(t, result.Nodes) | ||
}) | ||
|
||
testutil.RunStep(t, "register services", func(t *testing.T) { | ||
require.NoError(t, store.EnsureRegistration(10, &structs.RegisterRequest{ | ||
Datacenter: "dc1", | ||
Node: "foo", | ||
Address: "127.0.0.1", | ||
Service: &structs.NodeService{ | ||
ID: serviceName + "1", | ||
Service: serviceName, | ||
Port: 80, | ||
}, | ||
})) | ||
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh) | ||
require.Len(t, result.Nodes, 1) | ||
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh) | ||
require.Len(t, result.Nodes, 1) | ||
|
||
require.NoError(t, store.EnsureRegistration(11, &structs.RegisterRequest{ | ||
Datacenter: "dc1", | ||
Node: "foo", | ||
Address: "127.0.0.1", | ||
Service: &structs.NodeService{ | ||
ID: serviceName + "2", | ||
Service: serviceName, | ||
Port: 81, | ||
}, | ||
})) | ||
result = getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh) | ||
require.Len(t, result.Nodes, 2) | ||
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh) | ||
require.Len(t, result.Nodes, 1) | ||
require.Equal(t, "web1", result.Nodes[0].Service.ID) | ||
}) | ||
|
||
testutil.RunStep(t, "deregister service", func(t *testing.T) { | ||
require.NoError(t, store.DeleteService(12, "foo", serviceName+"1", nil, "")) | ||
result := getEventResult[*structs.IndexedCheckServiceNodes](t, eventCh) | ||
require.Len(t, result.Nodes, 1) | ||
result = getEventResult[*structs.IndexedCheckServiceNodes](t, filteredCh) | ||
require.Len(t, result.Nodes, 0) | ||
}) | ||
|
||
testutil.RunStep(t, "acl enforcement", func(t *testing.T) { | ||
require.NoError(t, store.EnsureRegistration(11, &structs.RegisterRequest{ | ||
Datacenter: "dc1", | ||
Node: "foo", | ||
Address: "127.0.0.1", | ||
Service: &structs.NodeService{ | ||
Service: serviceName + "-sidecar-proxy", | ||
Kind: structs.ServiceKindConnectProxy, | ||
Proxy: structs.ConnectProxyConfig{ | ||
DestinationServiceName: serviceName, | ||
}, | ||
}, | ||
})) | ||
|
||
authzDeny := policyAuthorizer(t, ``) | ||
authzAllow := policyAuthorizer(t, ` | ||
node_prefix "" { policy = "read" } | ||
service_prefix "web" { policy = "read" } | ||
`) | ||
|
||
// Start a stream where insufficient permissions are denied | ||
aclDenyCh := make(chan proxycfg.UpdateEvent) | ||
aclResolver.SwapAuthorizer(authzDeny) | ||
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{ | ||
Connect: true, | ||
Datacenter: datacenter, | ||
ServiceName: serviceName, | ||
}, "", aclDenyCh)) | ||
require.ErrorContains(t, getEventError(t, aclDenyCh), "Permission denied") | ||
|
||
// Adding ACL permissions will send valid data | ||
aclResolver.SwapAuthorizer(authzAllow) | ||
time.Sleep(dataSource.watchTimeout) | ||
result := getEventResult[*structs.IndexedCheckServiceNodes](t, aclDenyCh) | ||
require.Len(t, result.Nodes, 1) | ||
require.Equal(t, "web-sidecar-proxy", result.Nodes[0].Service.Service) | ||
|
||
// Start a stream where sufficient permissions are allowed | ||
aclAllowCh := make(chan proxycfg.UpdateEvent) | ||
aclResolver.SwapAuthorizer(authzAllow) | ||
require.NoError(t, dataSource.Notify(ctx, &structs.ServiceSpecificRequest{ | ||
Connect: true, | ||
Datacenter: datacenter, | ||
ServiceName: serviceName, | ||
}, "", aclAllowCh)) | ||
result = getEventResult[*structs.IndexedCheckServiceNodes](t, aclAllowCh) | ||
require.Len(t, result.Nodes, 1) | ||
require.Equal(t, "web-sidecar-proxy", result.Nodes[0].Service.Service) | ||
|
||
// Removing ACL permissions will send empty data | ||
aclResolver.SwapAuthorizer(authzDeny) | ||
time.Sleep(dataSource.watchTimeout) | ||
result = getEventResult[*structs.IndexedCheckServiceNodes](t, aclAllowCh) | ||
require.Len(t, result.Nodes, 0) | ||
|
||
// Adding ACL permissions will send valid data | ||
aclResolver.SwapAuthorizer(authzAllow) | ||
time.Sleep(dataSource.watchTimeout) | ||
result = getEventResult[*structs.IndexedCheckServiceNodes](t, aclAllowCh) | ||
require.Len(t, result.Nodes, 1) | ||
require.Equal(t, "web-sidecar-proxy", result.Nodes[0].Service.Service) | ||
}) | ||
}) | ||
} |