Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Manual Backport of Fix resolution of service resolvers with subsets for external upstreams into release/1.13.x #16559

Merged
merged 4 commits into from
Mar 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/16499.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
mesh: Fix resolution of service resolvers with subsets for external upstreams
```
24 changes: 20 additions & 4 deletions agent/rpcclient/health/view.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,10 @@ func NewHealthView(req structs.ServiceSpecificRequest) (*HealthView, error) {
return nil, err
}
return &HealthView{
state: make(map[string]structs.CheckServiceNode),
filter: fe,
state: make(map[string]structs.CheckServiceNode),
filter: fe,
connect: req.Connect,
kind: req.ServiceKind,
}, nil
}

Expand All @@ -61,8 +63,10 @@ func NewHealthView(req structs.ServiceSpecificRequest) (*HealthView, error) {
// (IndexedCheckServiceNodes) and update it in place for each event - that
// involves re-sorting each time etc. though.
type HealthView struct {
state map[string]structs.CheckServiceNode
filter filterEvaluator
connect bool
kind structs.ServiceKind
state map[string]structs.CheckServiceNode
filter filterEvaluator
}

// Update implements View
Expand All @@ -84,6 +88,13 @@ func (s *HealthView) Update(events []*pbsubscribe.Event) error {
if csn == nil {
return errors.New("check service node was unexpectedly nil")
}

// check if we intentionally need to skip the filter
if s.skipFilter(csn) {
s.state[id] = *csn
continue
}

passed, err := s.filter.Evaluate(*csn)
if err != nil {
return err
Expand All @@ -100,6 +111,11 @@ func (s *HealthView) Update(events []*pbsubscribe.Event) error {
return nil
}

func (s *HealthView) skipFilter(csn *structs.CheckServiceNode) bool {
// we only do this for connect-enabled services that need to be routed through a terminating gateway
return s.kind == "" && s.connect && csn.Service.Kind == structs.ServiceKindTerminatingGateway
}

type filterEvaluator interface {
Evaluate(datum interface{}) (bool, error)
}
Expand Down
36 changes: 36 additions & 0 deletions agent/rpcclient/health/view_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -941,3 +941,39 @@ func TestNewFilterEvaluator(t *testing.T) {
})
}
}

func TestHealthView_SkipFilteringTerminatingGateways(t *testing.T) {
view, err := NewHealthView(structs.ServiceSpecificRequest{
ServiceName: "name",
Connect: true,
QueryOptions: structs.QueryOptions{
Filter: "Service.Meta.version == \"v1\"",
},
})
require.NoError(t, err)

err = view.Update([]*pbsubscribe.Event{{
Index: 1,
Payload: &pbsubscribe.Event_ServiceHealth{
ServiceHealth: &pbsubscribe.ServiceHealthUpdate{
Op: pbsubscribe.CatalogOp_Register,
CheckServiceNode: &pbservice.CheckServiceNode{
Service: &pbservice.NodeService{
Kind: structs.TerminatingGateway,
Service: "name",
Address: "127.0.0.1",
Port: 8443,
},
},
},
},
}})
require.NoError(t, err)

node, ok := (view.Result(1)).(*structs.IndexedCheckServiceNodes)
require.True(t, ok)

require.Len(t, node.Nodes, 1)
require.Equal(t, "127.0.0.1", node.Nodes[0].Service.Address)
require.Equal(t, 8443, node.Nodes[0].Service.Port)
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,4 @@

snapshot_envoy_admin localhost:20000 terminating-gateway primary || true
snapshot_envoy_admin localhost:19000 s1 primary || true
snapshot_envoy_admin localhost:19001 s3 primary || true
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
services {
id = "s3"
name = "s3"
port = 8184
connect {
sidecar_service {
proxy {
upstreams = [
{
destination_name = "s2"
local_bind_port = 8185
}
]
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,5 @@ register_services primary

# terminating gateway will act as s2's proxy
gen_envoy_bootstrap s1 19000
gen_envoy_bootstrap s3 19001
gen_envoy_bootstrap terminating-gateway 20000 primary true
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
export REQUIRED_SERVICES="
s1 s1-sidecar-proxy
s2-v1
s3 s3-sidecar-proxy
terminating-gateway-primary
"
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,7 @@ load helpers
assert_envoy_metric_at_least 127.0.0.1:20000 "v1.s2.default.primary.*cx_total" 1
}

@test "terminating-gateway is used for the upstream connection of the proxy" {
# make sure we resolve the terminating gateway as endpoint for the upstream
assert_upstream_has_endpoint_port 127.0.0.1:19001 "v1.s2" 8443
}
43 changes: 43 additions & 0 deletions test/integration/connect/envoy/helpers.bash
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,49 @@ function get_envoy_metrics {
get_all_envoy_metrics $HOSTPORT | grep "$METRICS"
}

function get_upstream_endpoint {
local HOSTPORT=$1
local CLUSTER_NAME=$2
run curl -s -f "http://${HOSTPORT}/clusters?format=json"
[ "$status" -eq 0 ]
echo "$output" | jq --raw-output "
.cluster_statuses[]
| select(.name|startswith(\"${CLUSTER_NAME}\"))"
}

function get_upstream_endpoint_port {
local HOSTPORT=$1
local CLUSTER_NAME=$2
local PORT_VALUE=$3
run curl -s -f "http://${HOSTPORT}/clusters?format=json"
[ "$status" -eq 0 ]
echo "$output" | jq --raw-output "
.cluster_statuses[]
| select(.name|startswith(\"${CLUSTER_NAME}\"))
| [.host_statuses[].address.socket_address.port_value]
| [select(.[] == ${PORT_VALUE})]
| length"
}

function assert_upstream_has_endpoint_port_once {
local HOSTPORT=$1
local CLUSTER_NAME=$2
local PORT_VALUE=$3

GOT_COUNT=$(get_upstream_endpoint_port $HOSTPORT $CLUSTER_NAME $PORT_VALUE)

[ "$GOT_COUNT" -eq 1 ]
}

function assert_upstream_has_endpoint_port {
local HOSTPORT=$1
local CLUSTER_NAME=$2
local PORT_VALUE=$3

run retry_long assert_upstream_has_endpoint_port_once $HOSTPORT $CLUSTER_NAME $PORT_VALUE
[ "$status" -eq 0 ]
}

function get_upstream_endpoint_in_status_count {
local HOSTPORT=$1
local CLUSTER_NAME=$2
Expand Down