Skip to content

Commit

Permalink
Update xds generation for peering over mesh gws
Browse files Browse the repository at this point in the history
This commit adds the xDS resources needed for INBOUND traffic from peer
clusters:

- 1 filter chain for all inbound peering requests.
- 1 cluster for all inbound peering requests.
- 1 endpoint per voting server with the gRPC TLS port configured.

There is one filter chain and cluster because unlike with WAN
federation, peer clusters will not attempt to dial individual servers.
Peer clusters will only dial the local mesh gateway addresses.
  • Loading branch information
freddygv committed Sep 23, 2022
1 parent d818d7b commit 0d6d338
Show file tree
Hide file tree
Showing 9 changed files with 340 additions and 2 deletions.
100 changes: 100 additions & 0 deletions agent/proxycfg/testing_mesh_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,106 @@ func TestConfigSnapshotPeeredMeshGateway(t testing.T, variant string, nsFn func(
)

switch variant {
case "control-plane":
extraUpdates = append(extraUpdates,
UpdateEvent{
CorrelationID: meshConfigEntryID,
Result: &structs.ConfigEntryResponse{
Entry: &structs.MeshConfigEntry{
Peering: &structs.PeeringMeshConfig{
PeerThroughMeshGateways: true,
},
},
},
},
UpdateEvent{
CorrelationID: consulServerListWatchID,
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "replica",
Address: "127.0.0.10",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
// Read replicas cannot handle peering requests.
Meta: map[string]string{"read_replica": "true"},
},
},
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "node1",
Address: "127.0.0.1",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
Meta: map[string]string{
"grpc_port": "8502",
"grpc_tls_port": "8503",
},
},
},
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "node2",
Address: "127.0.0.2",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
Meta: map[string]string{
"grpc_port": "8502",
"grpc_tls_port": "8503",
},
TaggedAddresses: map[string]structs.ServiceAddress{
// WAN address is not considered for traffic from local gateway to local servers.
structs.TaggedAddressWAN: {
Address: "consul.server.dc1.my-domain",
Port: 10101,
},
},
},
},
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "node3",
Address: "127.0.0.3",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
Meta: map[string]string{
// Peering is not allowed over deprecated non-TLS gRPC port.
"grpc_port": "8502",
},
},
},
{
Node: &structs.Node{
Datacenter: "dc1",
Node: "node4",
Address: "127.0.0.4",
},
Service: &structs.NodeService{
ID: structs.ConsulServiceID,
Service: structs.ConsulServiceName,
Meta: map[string]string{
// Must have valid gRPC port.
"grpc_tls_port": "bad",
},
},
},
},
},
},
)
case "default-services-http":
proxyDefaults := &structs.ProxyConfigEntry{
Config: map[string]interface{}{
Expand Down
24 changes: 24 additions & 0 deletions agent/xds/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,6 +396,30 @@ func (s *ResourceGenerator) clustersFromSnapshotMeshGateway(cfgSnap *proxycfg.Co
}
}

// Create a single cluster for local servers to be dialed by peers.
// When peering through gateways we load balance across the local servers. They cannot be addressed individually.
if cfg := cfgSnap.MeshConfig(); cfg != nil && cfg.Peering != nil && cfg.Peering.PeerThroughMeshGateways {
servers, _ := cfgSnap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)

hasVoters := false
for _, srv := range servers {
if isReplica := srv.Service.Meta["read_replica"]; isReplica == "true" {
// Peering control-plane traffic can only ever be handled by the local leader.
// We avoid routing to read replicas since they will never be Raft voters.
continue
}
hasVoters = true
break
}

if hasVoters {
cluster := s.makeGatewayCluster(cfgSnap, clusterOpts{
name: connect.PeeringServerSAN(cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain),
})
clusters = append(clusters, cluster)
}
}

// generate the per-service/subset clusters
c, err := s.makeGatewayServiceClusters(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers)
if err != nil {
Expand Down
46 changes: 45 additions & 1 deletion agent/xds/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,11 @@ package xds
import (
"errors"
"fmt"
"strconv"

envoy_cluster_v3 "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
envoy_core_v3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
envoy_endpoint_v3 "github.com/envoyproxy/go-control-plane/envoy/config/endpoint/v3"

"github.com/golang/protobuf/proto"
bexpr "github.com/hashicorp/go-bexpr"

Expand Down Expand Up @@ -285,6 +285,50 @@ func (s *ResourceGenerator) endpointsFromSnapshotMeshGateway(cfgSnap *proxycfg.C
})
}

// Create endpoints for the cluster where local servers will be dialed by peers.
// When peering through gateways we load balance across the local servers. They cannot be addressed individually.
if cfg := cfgSnap.MeshConfig(); cfg != nil && cfg.Peering != nil && cfg.Peering.PeerThroughMeshGateways {
var serverEndpoints []*envoy_endpoint_v3.LbEndpoint

servers, _ := cfgSnap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)
for _, srv := range servers {
if isReplica := srv.Service.Meta["read_replica"]; isReplica == "true" {
// Peering control-plane traffic can only ever be handled by the local leader.
// We avoid routing to read replicas since they will never be Raft voters.
continue
}

_, addr, _ := srv.BestAddress(false)
portStr, ok := srv.Service.Meta["grpc_tls_port"]
if !ok {
s.Logger.Warn("peering is enabled but local server %q does not have the required gRPC TLS port configured",
"server", srv.Node.Node)
continue
}
port, err := strconv.Atoi(portStr)
if err != nil {
s.Logger.Error("peering is enabled but local server has invalid gRPC TLS port",
"server", srv.Node.Node, "port", portStr, "error", err)
continue
}

serverEndpoints = append(serverEndpoints, &envoy_endpoint_v3.LbEndpoint{
HostIdentifier: &envoy_endpoint_v3.LbEndpoint_Endpoint{
Endpoint: &envoy_endpoint_v3.Endpoint{
Address: makeAddress(addr, port),
},
},
})
}

resources = append(resources, &envoy_endpoint_v3.ClusterLoadAssignment{
ClusterName: connect.PeeringServerSAN(cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain),
Endpoints: []*envoy_endpoint_v3.LocalityLbEndpoints{{
LbEndpoints: serverEndpoints,
}},
})
}

// Generate the endpoints for each service and its subsets
e, err := s.endpointsFromServicesAndResolvers(cfgSnap, cfgSnap.MeshGateway.ServiceGroups, cfgSnap.MeshGateway.ServiceResolvers)
if err != nil {
Expand Down
38 changes: 37 additions & 1 deletion agent/xds/listeners.go
Original file line number Diff line number Diff line change
Expand Up @@ -1751,7 +1751,7 @@ func (s *ResourceGenerator) makeMeshGatewayListener(name, addr string, port int,

l.FilterChains = append(l.FilterChains, &envoy_listener_v3.FilterChain{
FilterChainMatch: &envoy_listener_v3.FilterChainMatch{
ServerNames: []string{fmt.Sprintf("%s", clusterName)},
ServerNames: []string{clusterName},
},
Filters: []*envoy_listener_v3.Filter{
dcTCPProxy,
Expand All @@ -1760,6 +1760,42 @@ func (s *ResourceGenerator) makeMeshGatewayListener(name, addr string, port int,
}
}

// Create a single cluster for local servers to be dialed by peers.
// When peering through gateways we load balance across the local servers. They cannot be addressed individually.
if cfg := cfgSnap.MeshConfig(); cfg != nil && cfg.Peering != nil && cfg.Peering.PeerThroughMeshGateways {
servers, _ := cfgSnap.MeshGateway.WatchedConsulServers.Get(structs.ConsulServiceName)

hasVoters := false
for _, srv := range servers {
if isReplica := srv.Service.Meta["read_replica"]; isReplica == "true" {
// Peering control-plane traffic can only ever be handled by the local leader.
// We avoid routing to read replicas since they will never be Raft voters.
continue
}
hasVoters = true
break
}

if hasVoters {
clusterName := connect.PeeringServerSAN(cfgSnap.Datacenter, cfgSnap.Roots.TrustDomain)
filterName := fmt.Sprintf("%s.%s", name, cfgSnap.Datacenter)

filter, err := makeTCPProxyFilter(filterName, clusterName, "mesh_gateway_local_peering_server.")
if err != nil {
return nil, err
}

l.FilterChains = append(l.FilterChains, &envoy_listener_v3.FilterChain{
FilterChainMatch: &envoy_listener_v3.FilterChainMatch{
ServerNames: []string{clusterName},
},
Filters: []*envoy_listener_v3.Filter{
filter,
},
})
}
}

// This needs to get tacked on at the end as it has no
// matching and will act as a catch all
l.FilterChains = append(l.FilterChains, sniClusterChain)
Expand Down
6 changes: 6 additions & 0 deletions agent/xds/resources_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,12 @@ func getMeshGatewayPeeringGoldenTestCases() []goldenTestCase {
return proxycfg.TestConfigSnapshotPeeredMeshGateway(t, "chain-and-l7-stuff", nil, nil)
},
},
{
name: "mesh-gateway-peering-control-plane",
create: func(t testinf.T) *proxycfg.ConfigSnapshot {
return proxycfg.TestConfigSnapshotPeeredMeshGateway(t, "control-plane", nil, nil)
},
},
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "server.dc1.peering.11111111-2222-3333-4444-555555555555.consul",
"type": "EDS",
"edsClusterConfig": {
"edsConfig": {
"ads": {

},
"resourceApiVersion": "V3"
}
},
"connectTimeout": "5s",
"outlierDetection": {

}
}
],
"typeUrl": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"nonce": "00000001"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
"clusterName": "server.dc1.peering.11111111-2222-3333-4444-555555555555.consul",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "127.0.0.1",
"portValue": 8503
}
}
}
},
{
"endpoint": {
"address": {
"socketAddress": {
"address": "127.0.0.2",
"portValue": 8503
}
}
}
}
]
}
]
}
],
"typeUrl": "type.googleapis.com/envoy.config.endpoint.v3.ClusterLoadAssignment",
"nonce": "00000001"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
{
"versionInfo": "00000001",
"resources": [
{
"@type": "type.googleapis.com/envoy.config.listener.v3.Listener",
"name": "default:1.2.3.4:8443",
"address": {
"socketAddress": {
"address": "1.2.3.4",
"portValue": 8443
}
},
"filterChains": [
{
"filterChainMatch": {
"serverNames": [
"server.dc1.peering.11111111-2222-3333-4444-555555555555.consul"
]
},
"filters": [
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "mesh_gateway_local_peering_server.default.dc1",
"cluster": "server.dc1.peering.11111111-2222-3333-4444-555555555555.consul"
}
}
]
},
{
"filters": [
{
"name": "envoy.filters.network.sni_cluster",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.sni_cluster.v3.SniCluster"
}
},
{
"name": "envoy.filters.network.tcp_proxy",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.network.tcp_proxy.v3.TcpProxy",
"statPrefix": "mesh_gateway_local.default",
"cluster": ""
}
}
]
}
],
"listenerFilters": [
{
"name": "envoy.filters.listener.tls_inspector",
"typedConfig": {
"@type": "type.googleapis.com/envoy.extensions.filters.listener.tls_inspector.v3.TlsInspector"
}
}
]
}
],
"typeUrl": "type.googleapis.com/envoy.config.listener.v3.Listener",
"nonce": "00000001"
}
Loading

0 comments on commit 0d6d338

Please sign in to comment.