Skip to content

Commit

Permalink
fixup! Make the mesh gateway changes to allow local mode for cluste…
Browse files Browse the repository at this point in the history
…r peering data plane traffic
  • Loading branch information
erichaberkorn committed Oct 5, 2022
1 parent e5aca17 commit ebe0a48
Show file tree
Hide file tree
Showing 16 changed files with 124 additions and 164 deletions.
15 changes: 12 additions & 3 deletions agent/proxycfg/mesh_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func (s *handlerMeshGateway) initialize(ctx context.Context) (ConfigSnapshot, er
snap.MeshGateway.WatchedDiscoveryChains = make(map[structs.ServiceName]context.CancelFunc)
snap.MeshGateway.WatchedPeeringServices = make(map[string]map[structs.ServiceName]context.CancelFunc)
snap.MeshGateway.WatchedPeers = make(map[string]context.CancelFunc)
snap.MeshGateway.PeeringServices = make(map[string]map[structs.ServiceName]structs.CheckServiceNodes)
snap.MeshGateway.PeeringServices = make(map[string]map[structs.ServiceName]PeeringServicesValue)

// there is no need to initialize the map of service resolvers as we
// fully rebuild it every time we get updates
Expand Down Expand Up @@ -645,10 +645,19 @@ func (s *handlerMeshGateway) handleUpdate(ctx context.Context, u UpdateEvent, sn

if len(resp.Nodes) > 0 {
if _, ok := snap.MeshGateway.PeeringServices[peer]; !ok {
snap.MeshGateway.PeeringServices[peer] = make(map[structs.ServiceName]structs.CheckServiceNodes)
snap.MeshGateway.PeeringServices[peer] = make(map[structs.ServiceName]PeeringServicesValue)
}

snap.MeshGateway.PeeringServices[peer][sn] = resp.Nodes
if eps := hostnameEndpoints(s.logger, GatewayKey{}, resp.Nodes); len(eps) > 0 {
snap.MeshGateway.PeeringServices[peer][sn] = PeeringServicesValue{
Nodes: eps,
UseCDS: true,
}
} else {
snap.MeshGateway.PeeringServices[peer][sn] = PeeringServicesValue{
Nodes: resp.Nodes,
}
}
} else if _, ok := snap.MeshGateway.PeeringServices[peer]; ok {
delete(snap.MeshGateway.PeeringServices[peer], sn)
}
Expand Down
7 changes: 6 additions & 1 deletion agent/proxycfg/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -348,6 +348,11 @@ func (c *configSnapshotTerminatingGateway) isEmpty() bool {
!c.MeshConfigSet
}

type PeeringServicesValue struct {
Nodes structs.CheckServiceNodes
UseCDS bool
}

type configSnapshotMeshGateway struct {
// WatchedServices is a map of service name to a cancel function. This cancel
// function is tied to the watch of connect enabled services for the given
Expand Down Expand Up @@ -376,7 +381,7 @@ type configSnapshotMeshGateway struct {
// PeeringServices is a map of peer name -> (map of
// service name -> CheckServiceNodes) and is used to determine the backing
// endpoints of a service on a peer.
PeeringServices map[string]map[structs.ServiceName]structs.CheckServiceNodes
PeeringServices map[string]map[structs.ServiceName]PeeringServicesValue

// WatchedPeeringServices is a map of peer name -> (map of service name ->
// cancel function) and is used to track watches on services within a peer.
Expand Down
25 changes: 0 additions & 25 deletions agent/proxycfg/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,31 +280,6 @@ func TestUpstreamNodesDC2(t testing.T) structs.CheckServiceNodes {
}
}

func TestUpstreamNodesPeerCluster01(t testing.T) structs.CheckServiceNodes {
peer := "cluster-01"
service := structs.TestNodeServiceWithNameInPeer(t, "web", peer)
return structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
ID: "test1",
Node: "test1",
Address: "10.40.1.1",
PeerName: peer,
},
Service: service,
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "test2",
Node: "test2",
Address: "10.40.1.2",
PeerName: peer,
},
Service: service,
},
}
}

func TestUpstreamNodesInStatusDC2(t testing.T, status string) structs.CheckServiceNodes {
return structs.CheckServiceNodes{
structs.CheckServiceNode{
Expand Down
40 changes: 4 additions & 36 deletions agent/proxycfg/testing_mesh_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -659,49 +659,17 @@ func TestConfigSnapshotPeeredMeshGateway(t testing.T, variant string, nsFn func(
CorrelationID: "peering-connect-service:peer-a:db",
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
ID: "test1",
Node: "test1",
Address: "10.40.1.1",
PeerName: "peer-a",
},
Service: structs.TestNodeServiceWithNameInPeer(t, "db", "peer-a"),
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "test2",
Node: "test2",
Address: "10.40.1.2",
PeerName: "peer-a",
},
Service: structs.TestNodeServiceWithNameInPeer(t, "db", "peer-a"),
},
structs.TestCheckNodeServiceWithNameInPeer(t, "db", "peer-a", "10.40.1.1", false),
structs.TestCheckNodeServiceWithNameInPeer(t, "db", "peer-a", "10.40.1.2", false),
},
},
},
UpdateEvent{
CorrelationID: "peering-connect-service:peer-b:alt",
Result: &structs.IndexedCheckServiceNodes{
Nodes: structs.CheckServiceNodes{
structs.CheckServiceNode{
Node: &structs.Node{
ID: "test1",
Node: "test1",
Address: "10.40.2.1",
PeerName: "peer-b",
},
Service: structs.TestNodeServiceWithNameInPeer(t, "alt", "peer-b"),
},
structs.CheckServiceNode{
Node: &structs.Node{
ID: "test2",
Node: "test2",
Address: "10.40.2.2",
PeerName: "peer-b",
},
Service: structs.TestNodeServiceWithNameInPeer(t, "alt", "peer-b"),
},
structs.TestCheckNodeServiceWithNameInPeer(t, "alt", "peer-b", "10.40.2.1", false),
structs.TestCheckNodeServiceWithNameInPeer(t, "alt", "peer-b", "10.40.2.2", true),
},
},
},
Expand Down
4 changes: 2 additions & 2 deletions agent/proxycfg/testing_upstreams.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ func setupTestVariationConfigEntriesAndSnapshot(
events = append(events, UpdateEvent{
CorrelationID: "upstream-peer:db?peer=cluster-01",
Result: &structs.IndexedCheckServiceNodes{
Nodes: TestUpstreamNodesPeerCluster01(t),
Nodes: structs.CheckServiceNodes{structs.TestCheckNodeServiceWithNameInPeer(t, "db", "cluster-01", "10.40.1.1", false)},
},
})
case "redirect-to-cluster-peer":
Expand All @@ -106,7 +106,7 @@ func setupTestVariationConfigEntriesAndSnapshot(
events = append(events, UpdateEvent{
CorrelationID: "upstream-peer:db?peer=cluster-01",
Result: &structs.IndexedCheckServiceNodes{
Nodes: TestUpstreamNodesPeerCluster01(t),
Nodes: structs.CheckServiceNodes{structs.TestCheckNodeServiceWithNameInPeer(t, "db", "cluster-01", "10.40.1.1", false)},
},
})
case "failover-through-double-remote-gateway-triggered":
Expand Down
27 changes: 25 additions & 2 deletions agent/structs/testing_catalog.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func TestNodeServiceWithName(t testing.T, name string) *NodeService {

const peerTrustDomain = "1c053652-8512-4373-90cf-5a7f6263a994.consul"

func TestNodeServiceWithNameInPeer(t testing.T, name string, peer string) *NodeService {
return &NodeService{
func TestCheckNodeServiceWithNameInPeer(t testing.T, name, peer, ip string, useHostname bool) CheckServiceNode {
service := &NodeService{
Kind: ServiceKindTypical,
Service: name,
Port: 8080,
Expand All @@ -73,6 +73,29 @@ func TestNodeServiceWithNameInPeer(t testing.T, name string, peer string) *NodeS
},
},
}

if useHostname {
service.TaggedAddresses = map[string]ServiceAddress{
TaggedAddressLAN: {
Address: ip,
Port: 443,
},
TaggedAddressWAN: {
Address: name + ".us-east-1.elb.notaws.com",
Port: 8443,
},
}
}

return CheckServiceNode{
Node: &Node{
ID: "test1",
Node: "test1",
Address: ip,
Datacenter: "cloud-dc",
},
Service: service,
}
}

// TestNodeServiceProxy returns a *NodeService representing a valid
Expand Down
43 changes: 33 additions & 10 deletions agent/xds/clusters.go
Original file line number Diff line number Diff line change
Expand Up @@ -575,23 +575,46 @@ func (s *ResourceGenerator) makeGatewayOutgoingClusterPeeringServiceClusters(cfg

for _, serviceGroups := range cfgSnap.MeshGateway.PeeringServices {
for sn, serviceGroup := range serviceGroups {
var clusterName string
var isRemote bool
if len(serviceGroup) > 0 {
node := serviceGroup[0]
if node.Service == nil {
return nil, fmt.Errorf("couldn't get SNI for peered service %s", sn.String())
}
clusterName = node.Service.Connect.PeerMeta.PrimarySNI()
isRemote = !cfgSnap.Locality.Matches(node.Node.Datacenter, node.Node.PartitionOrDefault())
if len(serviceGroup.Nodes) == 0 {
continue
}

node := serviceGroup.Nodes[0]
if node.Service == nil {
return nil, fmt.Errorf("couldn't get SNI for peered service %s", sn.String())
}
// This uses the SNI in the accepting cluster peer so the remote mesh
// gateway can distinguish between an exported service as opposed to the
// usual mesh gateway route for a service.
clusterName := node.Service.Connect.PeerMeta.PrimarySNI()

opts := clusterOpts{
name: clusterName,
isRemote: isRemote,
isRemote: true,
}
cluster := s.makeGatewayCluster(cfgSnap, opts)

if serviceGroup.UseCDS {
configureClusterWithHostnames(
s.Logger,
cluster,
"", /*TODO:make configurable?*/
serviceGroup.Nodes,
true, /*isRemote*/
false, /*onlyPassing*/
)
} else {
cluster.ClusterDiscoveryType = &envoy_cluster_v3.Cluster_Type{Type: envoy_cluster_v3.Cluster_EDS}
cluster.EdsClusterConfig = &envoy_cluster_v3.Cluster_EdsClusterConfig{
EdsConfig: &envoy_core_v3.ConfigSource{
ResourceApiVersion: envoy_core_v3.ApiVersion_V3,
ConfigSourceSpecifier: &envoy_core_v3.ConfigSource_Ads{
Ads: &envoy_core_v3.AggregatedConfigSource{},
},
},
}
}

clusters = append(clusters, cluster)
}
}
Expand Down
20 changes: 12 additions & 8 deletions agent/xds/endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,16 +413,20 @@ func (s *ResourceGenerator) makeEndpointsForOutgoingPeeredServices(
// generate the endpoints for the linked service groups
for _, serviceGroups := range cfgSnap.MeshGateway.PeeringServices {
for sn, serviceGroup := range serviceGroups {
var clusterName string
if len(serviceGroup) > 0 {
node := serviceGroup[0]
if node.Service == nil {
return nil, fmt.Errorf("couldn't get SNI for peered service %s", sn.String())
}
clusterName = node.Service.Connect.PeerMeta.PrimarySNI()
if serviceGroup.UseCDS || len(serviceGroup.Nodes) == 0 {
continue
}

node := serviceGroup.Nodes[0]
if node.Service == nil {
return nil, fmt.Errorf("couldn't get SNI for peered service %s", sn.String())
}
// This uses the SNI in the accepting cluster peer so the remote mesh
// gateway can distinguish between an exported service as opposed to the
// usual mesh gateway route for a service.
clusterName := node.Service.Connect.PeerMeta.PrimarySNI()

groups := []loadAssignmentEndpointGroup{{Endpoints: serviceGroup, OnlyPassing: false}}
groups := []loadAssignmentEndpointGroup{{Endpoints: serviceGroup.Nodes, OnlyPassing: false}}

la := makeLoadAssignment(
clusterName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@
},
"matchSubjectAltNames": [
{
"exact": "spiffe://1c053652-8512-4373-90cf-5a7f6263a994.consul/ns/default/dc/cluster-01-dc/svc/web"
"exact": "spiffe://1c053652-8512-4373-90cf-5a7f6263a994.consul/ns/default/dc/cluster-01-dc/svc/db"
}
]
}
},
"sni": "web.default.default.cluster-01.external.1c053652-8512-4373-90cf-5a7f6263a994.consul"
"sni": "db.default.default.cluster-01.external.1c053652-8512-4373-90cf-5a7f6263a994.consul"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@
},
"matchSubjectAltNames": [
{
"exact": "spiffe://1c053652-8512-4373-90cf-5a7f6263a994.consul/ns/default/dc/cluster-01-dc/svc/web"
"exact": "spiffe://1c053652-8512-4373-90cf-5a7f6263a994.consul/ns/default/dc/cluster-01-dc/svc/db"
}
]
}
},
"sni": "web.default.default.cluster-01.external.1c053652-8512-4373-90cf-5a7f6263a994.consul"
"sni": "db.default.default.cluster-01.external.1c053652-8512-4373-90cf-5a7f6263a994.consul"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,12 +66,12 @@
},
"matchSubjectAltNames": [
{
"exact": "spiffe://1c053652-8512-4373-90cf-5a7f6263a994.consul/ns/default/dc/cluster-01-dc/svc/web"
"exact": "spiffe://1c053652-8512-4373-90cf-5a7f6263a994.consul/ns/default/dc/cluster-01-dc/svc/db"
}
]
}
},
"sni": "web.default.default.cluster-01.external.1c053652-8512-4373-90cf-5a7f6263a994.consul"
"sni": "db.default.default.cluster-01.external.1c053652-8512-4373-90cf-5a7f6263a994.consul"
}
}
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
{
"@type": "type.googleapis.com/envoy.config.cluster.v3.Cluster",
"name": "alt.default.default.peer-b.external.1c053652-8512-4373-90cf-5a7f6263a994.consul",
"type": "EDS",
"type": "LOGICAL_DNS",
"edsClusterConfig": {
"edsConfig": {
"ads": {
Expand All @@ -14,6 +14,29 @@
}
},
"connectTimeout": "5s",
"loadAssignment": {
"clusterName": "alt.default.default.peer-b.external.1c053652-8512-4373-90cf-5a7f6263a994.consul",
"endpoints": [
{
"lbEndpoints": [
{
"endpoint": {
"address": {
"socketAddress": {
"address": "alt.us-east-1.elb.notaws.com",
"portValue": 8443
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
}
]
}
]
},
"dnsRefreshRate": "10s",
"dnsLookupFamily": "V4_ONLY",
"outlierDetection": {

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,6 @@
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
},
{
"endpoint": {
"address": {
"socketAddress": {
"address": "10.40.1.2",
"portValue": 8080
}
}
},
"healthStatus": "HEALTHY",
"loadBalancingWeight": 1
}
]
}
Expand Down
Loading

0 comments on commit ebe0a48

Please sign in to comment.