Skip to content

Commit

Permalink
Manage local server watches depending on mesh cfg
Browse files Browse the repository at this point in the history
Routing peering control plane traffic through mesh gateways can be
enabled or disabled at runtime with the mesh config entry.

This commit updates proxycfg to add or cancel watches for local servers
depending on this central config.

Note that WAN federation over mesh gateways is determined by a service
metadata flag, and any updates to the gateway service registration will
force the creation of a new snapshot. If enabled, WAN-fed over mesh
gateways will trigger a local server watch on initialize().

Because of this we will only add/remove server watches if WAN federation
over mesh gateways is disabled.
  • Loading branch information
freddygv committed Sep 23, 2022
1 parent 28fc980 commit d818d7b
Show file tree
Hide file tree
Showing 6 changed files with 254 additions and 20 deletions.
63 changes: 52 additions & 11 deletions agent/proxycfg/mesh_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

cachetype "github.com/hashicorp/consul/agent/cache-types"
"github.com/hashicorp/consul/agent/proxycfg/internal/watch"
"github.com/hashicorp/consul/agent/structs"
"github.com/hashicorp/consul/lib/maps"
"github.com/hashicorp/consul/logging"
Expand All @@ -21,6 +22,8 @@ type handlerMeshGateway struct {
// initialize sets up the watches needed based on the current mesh gateway registration
func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, error) {
snap := newConfigSnapshotFromServiceInstance(s.serviceInstance, s.stateConfig)
snap.MeshGateway.WatchedConsulServers = watch.NewMap[string, structs.CheckServiceNodes]()

// Watch for root changes
err := s.dataSources.CARoots.Notify(ctx, &structs.DCSpecificRequest{
Datacenter: s.source.Datacenter,
Expand Down Expand Up @@ -76,7 +79,7 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
}

if s.proxyID.InDefaultPartition() {
if err := s.initializeCrossDCWatches(ctx); err != nil {
if err := s.initializeCrossDCWatches(ctx, &snap); err != nil {
return snap, err
}
}
Expand Down Expand Up @@ -123,7 +126,7 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
return snap, err
}

func (s *handlerMeshGateway) initializeCrossDCWatches(ctx context.Context) error {
func (s *handlerMeshGateway) initializeCrossDCWatches(ctx context.Context, snap *ConfigSnapshot) error {
if s.meta[structs.MetaWANFederationKey] == "1" {
// Conveniently we can just use this service meta attribute in one
// place here to set the machinery in motion and leave the conditional
Expand All @@ -145,6 +148,7 @@ func (s *handlerMeshGateway) initializeCrossDCWatches(ctx context.Context) error
if err != nil {
return err
}
snap.MeshGateway.WatchedConsulServers.InitWatch(structs.ConsulServiceName, nil)
}

err := s.dataSources.Datacenters.Notify(ctx, &structs.DatacentersRequest{
Expand Down Expand Up @@ -325,7 +329,6 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
return fmt.Errorf("invalid type for response: %T", u.Result)
}

// Do some initial sanity checks to avoid doing something dumb.
for _, csn := range resp.Nodes {
if csn.Service.Service != structs.ConsulServiceName {
return fmt.Errorf("expected service name %q but got %q",
Expand All @@ -337,7 +340,7 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
}
}

snap.MeshGateway.ConsulServers = resp.Nodes
snap.MeshGateway.WatchedConsulServers.Set(structs.ConsulServiceName, resp.Nodes)

case exportedServiceListWatchID:
exportedServices, ok := u.Result.(*structs.IndexedExportedServiceList)
Expand Down Expand Up @@ -463,17 +466,55 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn
return fmt.Errorf("invalid type for response: %T", u.Result)
}

if resp.Entry != nil {
meshConf, ok := resp.Entry.(*structs.MeshConfigEntry)
if !ok {
return fmt.Errorf("invalid type for config entry: %T", resp.Entry)
}
snap.MeshGateway.MeshConfig = meshConf
} else {
if resp.Entry == nil {
snap.MeshGateway.MeshConfig = nil

// We avoid managing server watches when WAN federation is enabled since it
// always requires server watches.
if s.meta[structs.MetaWANFederationKey] != "1" {
// If the entry was deleted we cancel watches that may have existed because of
// PeerThroughMeshGateways being set in the past.
snap.MeshGateway.WatchedConsulServers.CancelWatch(structs.ConsulServiceName)
}

snap.MeshGateway.MeshConfigSet = true
return nil
}

meshConf, ok := resp.Entry.(*structs.MeshConfigEntry)
if !ok {
return fmt.Errorf("invalid type for config entry: %T", resp.Entry)
}
snap.MeshGateway.MeshConfig = meshConf
snap.MeshGateway.MeshConfigSet = true

// We avoid managing Consul server watches when WAN federation is enabled since it
// always requires server watches.
if s.meta[structs.MetaWANFederationKey] == "1" {
return nil
}

if meshConf.Peering == nil || !meshConf.Peering.PeerThroughMeshGateways {
snap.MeshGateway.WatchedConsulServers.CancelWatch(structs.ConsulServiceName)
return nil
}
if snap.MeshGateway.WatchedConsulServers.IsWatched(structs.ConsulServiceName) {
return nil
}

notifyCtx, cancel := context.WithCancel(ctx)
err := s.dataSources.Health.Notify(notifyCtx, &structs.ServiceSpecificRequest{
Datacenter: s.source.Datacenter,
QueryOptions: structs.QueryOptions{Token: s.token},
ServiceName: structs.ConsulServiceName,
}, consulServerListWatchID, s.ch)
if err != nil {
cancel()
return fmt.Errorf("failed to watch local consul servers: %w", err)
}

snap.MeshGateway.WatchedConsulServers.InitWatch(structs.ConsulServiceName, cancel)

default:
switch {
case strings.HasPrefix(u.CorrelationID, "connect-service:"):
Expand Down
16 changes: 11 additions & 5 deletions agent/proxycfg/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -375,8 +375,11 @@ type configSnapshotMeshGateway struct {
// datacenter.
FedStateGateways map[string]structs.CheckServiceNodes

// ConsulServers is the list of consul servers in this datacenter.
ConsulServers structs.CheckServiceNodes
// WatchedConsulServers is a map of (structs.ConsulServiceName -> structs.CheckServiceNodes)`
// Mesh gateways can spin up watches for local servers both for
// WAN federation and for peering. This map ensures we only have one
// watch at a time.
WatchedConsulServers watch.Map[string, structs.CheckServiceNodes]

// HostnameDatacenters is a map of datacenters to mesh gateway instances with a hostname as the address.
// If hostnames are configured they must be provided to Envoy via CDS not EDS.
Expand Down Expand Up @@ -556,8 +559,8 @@ func (c *configSnapshotMeshGateway) isEmpty() bool {
len(c.ServiceResolvers) == 0 &&
len(c.GatewayGroups) == 0 &&
len(c.FedStateGateways) == 0 &&
len(c.ConsulServers) == 0 &&
len(c.HostnameDatacenters) == 0 &&
c.WatchedConsulServers.Len() == 0 &&
c.isEmptyPeering()
}

Expand Down Expand Up @@ -690,8 +693,11 @@ func (s *ConfigSnapshot) Valid() bool {
s.TerminatingGateway.MeshConfigSet

case structs.ServiceKindMeshGateway:
if s.ServiceMeta[structs.MetaWANFederationKey] == "1" {
if len(s.MeshGateway.ConsulServers) == 0 {
if s.MeshGateway.WatchedConsulServers.Len() == 0 {
if s.ServiceMeta[structs.MetaWANFederationKey] == "1" {
return false
}
if cfg := s.MeshConfig(); cfg != nil && cfg.Peering != nil && cfg.Peering.PeerThroughMeshGateways {
return false
}
}
Expand Down
184 changes: 184 additions & 0 deletions agent/proxycfg/state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -779,6 +779,9 @@ func TestState_WatchesAndUpdates(t *testing.T) {
Service: "mesh-gateway",
Address: "10.0.1.1",
Port: 443,
Meta: map[string]string{
structs.MetaWANFederationKey: "1",
},
},
sourceDC: "dc1",
stages: []verificationStage{
Expand All @@ -790,6 +793,7 @@ func TestState_WatchesAndUpdates(t *testing.T) {
exportedServiceListWatchID: genVerifyDCSpecificWatch("dc1"),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
peeringTrustBundlesWatchID: genVerifyTrustBundleListWatchForMeshGateway(""),
consulServerListWatchID: genVerifyServiceSpecificPeeredRequest(structs.ConsulServiceName, "", "dc1", "", false),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "gateway without root is not valid")
Expand Down Expand Up @@ -1015,6 +1019,186 @@ func TestState_WatchesAndUpdates(t *testing.T) {
},
},
},
"mesh-gateway-peering-control-plane": {
ns: structs.NodeService{
Kind: structs.ServiceKindMeshGateway,
ID: "mesh-gateway",
Service: "mesh-gateway",
Address: "10.0.1.1",
Port: 443,
},
sourceDC: "dc1",
stages: []verificationStage{
{
requiredWatches: map[string]verifyWatchRequest{
datacentersWatchID: verifyDatacentersWatch,
serviceListWatchID: genVerifyDCSpecificWatch("dc1"),
rootsWatchID: genVerifyDCSpecificWatch("dc1"),
exportedServiceListWatchID: genVerifyDCSpecificWatch("dc1"),
meshConfigEntryID: genVerifyMeshConfigWatch("dc1"),
peeringTrustBundlesWatchID: genVerifyTrustBundleListWatchForMeshGateway(""),
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.False(t, snap.Valid(), "gateway without root is not valid")
},
},
{
events: []UpdateEvent{
rootWatchEvent(),
{
CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{
Entry: &structs.MeshConfigEntry{
Peering: &structs.PeeringMeshConfig{
PeerThroughMeshGateways: true,
},
},
},
},
{
CorrelationID: exportedServiceListWatchID,
Result: &structs.IndexedExportedServiceList{
Services: nil,
},
},
{
CorrelationID: serviceListWatchID,
Result: &structs.IndexedServiceList{
Services: structs.ServiceList{},
},
},
{
CorrelationID: peeringTrustBundlesWatchID,
Result: &pbpeering.TrustBundleListByServiceResponse{
Bundles: nil,
},
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.Equal(t, indexedRoots, snap.Roots)
require.True(t, snap.MeshGateway.WatchedServicesSet)
require.True(t, snap.MeshGateway.PeeringTrustBundlesSet)
require.True(t, snap.MeshGateway.MeshConfigSet)

require.True(t, snap.Valid(), "gateway without services is valid")
require.True(t, snap.ConnectProxy.isEmpty())
},
},
{
requiredWatches: map[string]verifyWatchRequest{
consulServerListWatchID: genVerifyServiceSpecificPeeredRequest(structs.ConsulServiceName, "", "dc1", "", false),
},
events: []UpdateEvent{
{
CorrelationID: consulServerListWatchID,
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "node1",
Address: "127.0.0.1",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
},
},
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "replica1",
Address: "127.0.0.1",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
Meta: map[string]string{"read_replica": "true"},
},
},
},
},
Err: nil,
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid())

servers, ok := snap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
require.True(t, ok)

expect := structs.CheckServiceNodes{
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "node1",
Address: "127.0.0.1",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
},
},
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "replica1",
Address: "127.0.0.1",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
Meta: map[string]string{"read_replica": "true"},
},
},
}
require.Equal(t, expect, servers)
},
},
{
events: []UpdateEvent{
{
CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{
Entry: &structs.MeshConfigEntry{
Peering: &structs.PeeringMeshConfig{
PeerThroughMeshGateways: false,
},
},
},
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid())
require.NotNil(t, snap.MeshConfig())

require.False(t, snap.MeshGateway.WatchedConsulServers.IsWatched(structs.ConsulServiceName))
servers, ok := snap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
require.False(t, ok)
require.Empty(t, servers)
},
},
{
events: []UpdateEvent{
{
CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{
Entry: nil,
},
},
},
verifySnapshot: func(t testing.TB, snap *ConfigSnapshot) {
require.True(t, snap.Valid())
require.Nil(t, snap.MeshConfig())

require.False(t, snap.MeshGateway.WatchedConsulServers.IsWatched(structs.ConsulServiceName))
servers, ok := snap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
require.False(t, ok)
require.Empty(t, servers)
},
},
},
},
"ingress-gateway": {
ns: structs.NodeService{
Kind: structs.ServiceKindIngressGateway,
Expand Down
3 changes: 2 additions & 1 deletion agent/xds/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -386,7 +386,8 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
}

// And for the current datacenter, send all flavors appropriately.
for _, srv := range cfgSnap.MeshGateway.ConsulServers {
servers, _ := cfgSnap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
for _, srv := range servers {
opts := clusterOpts{
name: cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node),
}
Expand Down
3 changes: 2 additions & 1 deletion agent/xds/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,8 @@ func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.C
cfgSnap.ServerSNIFn != nil {
var allServersLbEndpoints []*envoy_endpoint_v3.LbEndpoint

for _, srv := range cfgSnap.MeshGateway.ConsulServers {
servers, _ := cfgSnap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
for _, srv := range servers {
clusterName := cfgSnap.ServerSNIFn(cfgSnap.Datacenter, srv.Node.Node)

_, addr, port := srv.BestAddress(false /*wan*/)
Expand Down
Loading

0 comments on commit d818d7b

Please sign in to comment.