From 64703d7994c32d9040d8f9165895559d35d3a375 Mon Sep 17 00:00:00 2001 From: Andrew Stucki Date: Wed, 1 Mar 2023 23:02:43 -0500 Subject: [PATCH 1/4] Fix resolution of service resolvers with subsets for external upstreams --- agent/rpcclient/health/view.go | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/agent/rpcclient/health/view.go b/agent/rpcclient/health/view.go index fd19cb4a001b..3cadea10564b 100644 --- a/agent/rpcclient/health/view.go +++ b/agent/rpcclient/health/view.go @@ -52,6 +52,8 @@ func NewHealthView(req structs.ServiceSpecificRequest) (*HealthView, error) { return &HealthView{ state: make(map[string]structs.CheckServiceNode), filter: fe, + name: req.ServiceName, + kind: req.ServiceKind, }, nil } @@ -61,6 +63,8 @@ func NewHealthView(req structs.ServiceSpecificRequest) (*HealthView, error) { // (IndexedCheckServiceNodes) and update it in place for each event - that // involves re-sorting each time etc. though. type HealthView struct { + name string + kind structs.ServiceKind state map[string]structs.CheckServiceNode filter filterEvaluator } @@ -84,6 +88,13 @@ func (s *HealthView) Update(events []*pbsubscribe.Event) error { if csn == nil { return errors.New("check service node was unexpectedly nil") } + + // check if we intentionally need to skip the filter + if s.skipFilter(csn) { + s.state[id] = *csn + continue + } + passed, err := s.filter.Evaluate(*csn) if err != nil { return err @@ -100,6 +111,17 @@ func (s *HealthView) Update(events []*pbsubscribe.Event) error { return nil } +func (s *HealthView) skipFilter(csn *structs.CheckServiceNode) bool { + // we only do this for services that need to be routed through a gateway + if s.kind != "" { + return false + } + if s.name != csn.Service.Service && csn.Service.Kind == structs.ServiceKindTerminatingGateway { + return true + } + return false +} + type filterEvaluator interface { Evaluate(datum interface{}) (bool, error) } From 8a8bd523e8dc47cdce80ebd49ee24a217bdc6b6e Mon Sep 17 00:00:00 2001 From: Andrew Stucki Date: Thu, 2 Mar 2023 16:48:54 -0500 Subject: [PATCH 2/4] Add tests --- agent/rpcclient/health/view_test.go | 35 +++++++++++++++ .../capture.sh | 1 + .../service_s3.hcl | 17 ++++++++ .../case-terminating-gateway-subsets/setup.sh | 1 + .../case-terminating-gateway-subsets/vars.sh | 1 + .../verify.bats | 4 ++ test/integration/connect/envoy/helpers.bash | 43 +++++++++++++++++++ 7 files changed, 102 insertions(+) create mode 100644 test/integration/connect/envoy/case-terminating-gateway-subsets/service_s3.hcl diff --git a/agent/rpcclient/health/view_test.go b/agent/rpcclient/health/view_test.go index 8fcb50da3396..4d0308380cc1 100644 --- a/agent/rpcclient/health/view_test.go +++ b/agent/rpcclient/health/view_test.go @@ -941,3 +941,38 @@ func TestNewFilterEvaluator(t *testing.T) { }) } } + +func TestHealthView_SkipFilteringTerminatingGateways(t *testing.T) { + view, err := NewHealthView(structs.ServiceSpecificRequest{ + ServiceName: "web", + QueryOptions: structs.QueryOptions{ + Filter: "Service.Meta.version == \"v1\"", + }, + }) + require.NoError(t, err) + + err = view.Update([]*pbsubscribe.Event{{ + Index: 1, + Payload: &pbsubscribe.Event_ServiceHealth{ + ServiceHealth: &pbsubscribe.ServiceHealthUpdate{ + Op: pbsubscribe.CatalogOp_Register, + CheckServiceNode: &pbservice.CheckServiceNode{ + Service: &pbservice.NodeService{ + Kind: structs.TerminatingGateway, + Service: "gateway", + Address: "127.0.0.1", + Port: 8443, + }, + }, + }, + }, + }}) + require.NoError(t, err) + + node, ok := (view.Result(1)).(*structs.IndexedCheckServiceNodes) + require.True(t, ok) + + require.Len(t, node.Nodes, 1) + require.Equal(t, "127.0.0.1", node.Nodes[0].Service.Address) + require.Equal(t, 8443, node.Nodes[0].Service.Port) +} diff --git a/test/integration/connect/envoy/case-terminating-gateway-subsets/capture.sh b/test/integration/connect/envoy/case-terminating-gateway-subsets/capture.sh index 2ef0c41a215e..261bf4e29a6c 100644 --- a/test/integration/connect/envoy/case-terminating-gateway-subsets/capture.sh +++ b/test/integration/connect/envoy/case-terminating-gateway-subsets/capture.sh @@ -2,3 +2,4 @@ snapshot_envoy_admin localhost:20000 terminating-gateway primary || true snapshot_envoy_admin localhost:19000 s1 primary || true +snapshot_envoy_admin localhost:19001 s3 primary || true diff --git a/test/integration/connect/envoy/case-terminating-gateway-subsets/service_s3.hcl b/test/integration/connect/envoy/case-terminating-gateway-subsets/service_s3.hcl new file mode 100644 index 000000000000..eb84c578ee9e --- /dev/null +++ b/test/integration/connect/envoy/case-terminating-gateway-subsets/service_s3.hcl @@ -0,0 +1,17 @@ +services { + id = "s3" + name = "s3" + port = 8184 + connect { + sidecar_service { + proxy { + upstreams = [ + { + destination_name = "s2" + local_bind_port = 8185 + } + ] + } + } + } +} diff --git a/test/integration/connect/envoy/case-terminating-gateway-subsets/setup.sh b/test/integration/connect/envoy/case-terminating-gateway-subsets/setup.sh index 850b47c68c9a..198e5a14ae37 100644 --- a/test/integration/connect/envoy/case-terminating-gateway-subsets/setup.sh +++ b/test/integration/connect/envoy/case-terminating-gateway-subsets/setup.sh @@ -11,4 +11,5 @@ register_services primary # terminating gateway will act as s2's proxy gen_envoy_bootstrap s1 19000 +gen_envoy_bootstrap s3 19001 gen_envoy_bootstrap terminating-gateway 20000 primary true diff --git a/test/integration/connect/envoy/case-terminating-gateway-subsets/vars.sh b/test/integration/connect/envoy/case-terminating-gateway-subsets/vars.sh index 9e52629b8bed..d4a4d75bdd88 100644 --- a/test/integration/connect/envoy/case-terminating-gateway-subsets/vars.sh +++ b/test/integration/connect/envoy/case-terminating-gateway-subsets/vars.sh @@ -4,5 +4,6 @@ export REQUIRED_SERVICES=" s1 s1-sidecar-proxy s2-v1 +s3 s3-sidecar-proxy terminating-gateway-primary " diff --git a/test/integration/connect/envoy/case-terminating-gateway-subsets/verify.bats b/test/integration/connect/envoy/case-terminating-gateway-subsets/verify.bats index 64a2499e3575..028ddea85ade 100644 --- a/test/integration/connect/envoy/case-terminating-gateway-subsets/verify.bats +++ b/test/integration/connect/envoy/case-terminating-gateway-subsets/verify.bats @@ -38,3 +38,7 @@ load helpers assert_envoy_metric_at_least 127.0.0.1:20000 "v1.s2.default.primary.*cx_total" 1 } +@test "terminating-gateway is used for the upstream connection of the proxy" { + # make sure we resolve the terminating gateway as endpoint for the upstream + assert_upstream_has_endpoint_port 127.0.0.1:19001 "v1.s2" 8443 +} diff --git a/test/integration/connect/envoy/helpers.bash b/test/integration/connect/envoy/helpers.bash index 2742c006afbb..f37c544fea52 100755 --- a/test/integration/connect/envoy/helpers.bash +++ b/test/integration/connect/envoy/helpers.bash @@ -346,6 +346,49 @@ function get_envoy_metrics { get_all_envoy_metrics $HOSTPORT | grep "$METRICS" } +function get_upstream_endpoint { + local HOSTPORT=$1 + local CLUSTER_NAME=$2 + run curl -s -f "http://${HOSTPORT}/clusters?format=json" + [ "$status" -eq 0 ] + echo "$output" | jq --raw-output " +.cluster_statuses[] +| select(.name|startswith(\"${CLUSTER_NAME}\"))" +} + +function get_upstream_endpoint_port { + local HOSTPORT=$1 + local CLUSTER_NAME=$2 + local PORT_VALUE=$3 + run curl -s -f "http://${HOSTPORT}/clusters?format=json" + [ "$status" -eq 0 ] + echo "$output" | jq --raw-output " +.cluster_statuses[] +| select(.name|startswith(\"${CLUSTER_NAME}\")) +| [.host_statuses[].address.socket_address.port_value] +| [select(.[] == ${PORT_VALUE})] +| length" +} + +function assert_upstream_has_endpoint_port_once { + local HOSTPORT=$1 + local CLUSTER_NAME=$2 + local PORT_VALUE=$3 + + GOT_COUNT=$(get_upstream_endpoint_port $HOSTPORT $CLUSTER_NAME $PORT_VALUE) + + [ "$GOT_COUNT" -eq 1 ] +} + +function assert_upstream_has_endpoint_port { + local HOSTPORT=$1 + local CLUSTER_NAME=$2 + local PORT_VALUE=$3 + + run retry_long assert_upstream_has_endpoint_port_once $HOSTPORT $CLUSTER_NAME $PORT_VALUE + [ "$status" -eq 0 ] +} + function get_upstream_endpoint_in_status_count { local HOSTPORT=$1 local CLUSTER_NAME=$2 From 71f6e1ab03c1750208700372a1b4bfec9616f803 Mon Sep 17 00:00:00 2001 From: Andrew Stucki Date: Thu, 2 Mar 2023 16:50:02 -0500 Subject: [PATCH 3/4] Add changelog entry --- .changelog/16499.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changelog/16499.txt diff --git a/.changelog/16499.txt b/.changelog/16499.txt new file mode 100644 index 000000000000..4bd50db47e8a --- /dev/null +++ b/.changelog/16499.txt @@ -0,0 +1,3 @@ +```release-note:bug +mesh: Fix resolution of service resolvers with subsets for external upstreams +``` From c8b21457bbe0d59db01e33c401077100cc7cde4a Mon Sep 17 00:00:00 2001 From: Andrew Stucki Date: Fri, 3 Mar 2023 11:17:06 -0500 Subject: [PATCH 4/4] Update view filter logic --- agent/rpcclient/health/view.go | 26 ++++++++++---------------- agent/rpcclient/health/view_test.go | 5 +++-- 2 files changed, 13 insertions(+), 18 deletions(-) diff --git a/agent/rpcclient/health/view.go b/agent/rpcclient/health/view.go index 3cadea10564b..ce32e801c335 100644 --- a/agent/rpcclient/health/view.go +++ b/agent/rpcclient/health/view.go @@ -50,10 +50,10 @@ func NewHealthView(req structs.ServiceSpecificRequest) (*HealthView, error) { return nil, err } return &HealthView{ - state: make(map[string]structs.CheckServiceNode), - filter: fe, - name: req.ServiceName, - kind: req.ServiceKind, + state: make(map[string]structs.CheckServiceNode), + filter: fe, + connect: req.Connect, + kind: req.ServiceKind, }, nil } @@ -63,10 +63,10 @@ func NewHealthView(req structs.ServiceSpecificRequest) (*HealthView, error) { // (IndexedCheckServiceNodes) and update it in place for each event - that // involves re-sorting each time etc. though. type HealthView struct { - name string - kind structs.ServiceKind - state map[string]structs.CheckServiceNode - filter filterEvaluator + connect bool + kind structs.ServiceKind + state map[string]structs.CheckServiceNode + filter filterEvaluator } // Update implements View @@ -112,14 +112,8 @@ func (s *HealthView) Update(events []*pbsubscribe.Event) error { } func (s *HealthView) skipFilter(csn *structs.CheckServiceNode) bool { - // we only do this for services that need to be routed through a gateway - if s.kind != "" { - return false - } - if s.name != csn.Service.Service && csn.Service.Kind == structs.ServiceKindTerminatingGateway { - return true - } - return false + // we only do this for connect-enabled services that need to be routed through a terminating gateway + return s.kind == "" && s.connect && csn.Service.Kind == structs.ServiceKindTerminatingGateway } type filterEvaluator interface { diff --git a/agent/rpcclient/health/view_test.go b/agent/rpcclient/health/view_test.go index 4d0308380cc1..f1d2cd0869d3 100644 --- a/agent/rpcclient/health/view_test.go +++ b/agent/rpcclient/health/view_test.go @@ -944,7 +944,8 @@ func TestNewFilterEvaluator(t *testing.T) { func TestHealthView_SkipFilteringTerminatingGateways(t *testing.T) { view, err := NewHealthView(structs.ServiceSpecificRequest{ - ServiceName: "web", + ServiceName: "name", + Connect: true, QueryOptions: structs.QueryOptions{ Filter: "Service.Meta.version == \"v1\"", }, @@ -959,7 +960,7 @@ func TestHealthView_SkipFilteringTerminatingGateways(t *testing.T) { CheckServiceNode: &pbservice.CheckServiceNode{ Service: &pbservice.NodeService{ Kind: structs.TerminatingGateway, - Service: "gateway", + Service: "name", Address: "127.0.0.1", Port: 8443, },