Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Connect with Prepared Queries #5291

Merged
merged 4 commits into from
Feb 4, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
78 changes: 78 additions & 0 deletions agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2169,6 +2169,84 @@ func TestAgent_loadServices_sidecarSeparateToken(t *testing.T) {
}
}

func TestAgent_loadServices_sidecarInheritMeta(t *testing.T) {
t.Parallel()

a := NewTestAgent(t.Name(), `
service = {
id = "rabbitmq"
name = "rabbitmq"
port = 5672
tags = ["a", "b"],
meta = {
environment = "prod"
}
connect = {
sidecar_service {

}
}
}
`)
defer a.Shutdown()

services := a.State.Services()

svc, ok := services["rabbitmq"]
require.True(t, ok, "missing service")
require.Len(t, svc.Tags, 2)
require.Len(t, svc.Meta, 1)

sidecar, ok := services["rabbitmq-sidecar-proxy"]
require.True(t, ok, "missing sidecar service")
require.ElementsMatch(t, svc.Tags, sidecar.Tags)
require.Len(t, sidecar.Meta, 1)
meta, ok := sidecar.Meta["environment"]
require.True(t, ok, "missing sidecar service meta")
require.Equal(t, "prod", meta)
}

func TestAgent_loadServices_sidecarOverrideMeta(t *testing.T) {
t.Parallel()

a := NewTestAgent(t.Name(), `
service = {
id = "rabbitmq"
name = "rabbitmq"
port = 5672
tags = ["a", "b"],
meta = {
environment = "prod"
}
connect = {
sidecar_service {
tags = ["foo"],
meta = {
environment = "qa"
}
}
}
}
`)
defer a.Shutdown()

services := a.State.Services()

svc, ok := services["rabbitmq"]
require.True(t, ok, "missing service")
require.Len(t, svc.Tags, 2)
require.Len(t, svc.Meta, 1)

sidecar, ok := services["rabbitmq-sidecar-proxy"]
require.True(t, ok, "missing sidecar service")
require.Len(t, sidecar.Tags, 1)
require.Equal(t, "foo", sidecar.Tags[0])
require.Len(t, sidecar.Meta, 1)
meta, ok := sidecar.Meta["environment"]
require.True(t, ok, "missing sidecar service meta")
require.Equal(t, "qa", meta)
}

func TestAgent_unloadServices(t *testing.T) {
t.Parallel()
a := NewTestAgent(t.Name(), "")
Expand Down
15 changes: 15 additions & 0 deletions agent/consul/prepared_query_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,11 @@ func (p *PreparedQuery) execute(query *structs.PreparedQuery,
nodes = nodeMetaFilter(query.Service.NodeMeta, nodes)
}

// Apply the service metadata filters, if any.
if len(query.Service.ServiceMeta) > 0 {
nodes = serviceMetaFilter(query.Service.ServiceMeta, nodes)
}

mkeeler marked this conversation as resolved.
Show resolved Hide resolved
// Apply the tag filters, if any.
if len(query.Service.Tags) > 0 {
nodes = tagFilter(query.Service.Tags, nodes)
Expand Down Expand Up @@ -616,6 +621,16 @@ func nodeMetaFilter(filters map[string]string, nodes structs.CheckServiceNodes)
return filtered
}

func serviceMetaFilter(filters map[string]string, nodes structs.CheckServiceNodes) structs.CheckServiceNodes {
var filtered structs.CheckServiceNodes
for _, node := range nodes {
if structs.SatisfiesMetaFilters(node.Service.Meta, filters) {
filtered = append(filtered, node)
}
}
return filtered
}

// queryServer is a wrapper that makes it easier to test the failover logic.
type queryServer interface {
GetLogger() *log.Logger
Expand Down
68 changes: 67 additions & 1 deletion agent/consul/prepared_query_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1512,11 +1512,16 @@ func TestPreparedQuery_Execute(t *testing.T) {
Service: "foo",
Port: 8000,
Tags: []string{dc, fmt.Sprintf("tag%d", i+1)},
Meta: map[string]string{
"svc-group": fmt.Sprintf("%d", i%2),
"foo": "true",
},
},
WriteRequest: structs.WriteRequest{Token: "root"},
}
if i == 0 {
req.NodeMeta["unique"] = "true"
req.Service.Meta["unique"] = "true"
}

var codec rpc.ClientCodec
Expand Down Expand Up @@ -1617,7 +1622,7 @@ func TestPreparedQuery_Execute(t *testing.T) {
}

// Run various service queries with node metadata filters.
if false {
{
cases := []struct {
filters map[string]string
numNodes int
Expand Down Expand Up @@ -1682,6 +1687,67 @@ func TestPreparedQuery_Execute(t *testing.T) {
}
}

// Run various service queries with service metadata filters
{
cases := []struct {
filters map[string]string
numNodes int
}{
{
filters: map[string]string{},
numNodes: 10,
},
{
filters: map[string]string{"foo": "true"},
numNodes: 10,
},
{
filters: map[string]string{"svc-group": "0"},
numNodes: 5,
},
{
filters: map[string]string{"svc-group": "1"},
numNodes: 5,
},
{
filters: map[string]string{"svc-group": "0", "unique": "true"},
numNodes: 1,
},
}

for _, tc := range cases {
svcMetaQuery := structs.PreparedQueryRequest{
Datacenter: "dc1",
Op: structs.PreparedQueryCreate,
Query: &structs.PreparedQuery{
Service: structs.ServiceQuery{
Service: "foo",
ServiceMeta: tc.filters,
},
DNS: structs.QueryDNSOptions{
TTL: "10s",
},
},
WriteRequest: structs.WriteRequest{Token: "root"},
}

require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Apply", &svcMetaQuery, &svcMetaQuery.Query.ID))

req := structs.PreparedQueryExecuteRequest{
Datacenter: "dc1",
QueryIDOrName: svcMetaQuery.Query.ID,
QueryOptions: structs.QueryOptions{Token: execToken},
}

var reply structs.PreparedQueryExecuteResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec1, "PreparedQuery.Execute", &req, &reply))
require.Len(t, reply.Nodes, tc.numNodes)
for _, node := range reply.Nodes {
require.True(t, structs.SatisfiesMetaFilters(node.Service.Meta, tc.filters))
}
}
}

// Push a coordinate for one of the nodes so we can try an RTT sort. We
// have to sleep a little while for the coordinate batch to get flushed.
{
Expand Down
2 changes: 2 additions & 0 deletions agent/prepared_query_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func TestPreparedQuery_Create(t *testing.T) {
OnlyPassing: true,
Tags: []string{"foo", "bar"},
NodeMeta: map[string]string{"somekey": "somevalue"},
ServiceMeta: map[string]string{"env": "prod"},
},
DNS: structs.QueryDNSOptions{
TTL: "10s",
Expand Down Expand Up @@ -132,6 +133,7 @@ func TestPreparedQuery_Create(t *testing.T) {
"OnlyPassing": true,
"Tags": []string{"foo", "bar"},
"NodeMeta": map[string]string{"somekey": "somevalue"},
"ServiceMeta": map[string]string{"env": "prod"},
},
"DNS": map[string]interface{}{
"TTL": "10s",
Expand Down
15 changes: 15 additions & 0 deletions agent/sidecar_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,21 @@ func (a *Agent) sidecarServiceFromNodeService(ns *structs.NodeService, token str
}
}

// Copy the service metadata from the original service if no other meta was provided
if len(sidecar.Meta) == 0 && len(ns.Meta) > 0 {
if sidecar.Meta == nil {
sidecar.Meta = make(map[string]string)
}
for k, v := range ns.Meta {
sidecar.Meta[k] = v
}
}

// Copy the tags from the original service if no other tags were specified
if len(sidecar.Tags) == 0 && len(ns.Tags) > 0 {
sidecar.Tags = append(sidecar.Tags, ns.Tags...)
}

// Flag this as a sidecar - this is not persisted in catalog but only needed
// in local agent state to disambiguate lineage when deregistering the parent
// service later.
Expand Down
41 changes: 41 additions & 0 deletions agent/sidecar_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ func TestAgent_sidecarServiceFromNodeService(t *testing.T) {
ID: "web1",
Name: "web",
Port: 1111,
Tags: []string{"baz"},
Meta: map[string]string{"foo": "baz"},
Connect: &structs.ServiceConnect{
SidecarService: &structs.ServiceDefinition{
Name: "motorbike1",
Expand Down Expand Up @@ -167,6 +169,45 @@ func TestAgent_sidecarServiceFromNodeService(t *testing.T) {
token: "foo",
wantErr: "auto-assignment disabled in config",
},
{
name: "inherit tags and meta",
sd: &structs.ServiceDefinition{
ID: "web1",
Name: "web",
Port: 1111,
Tags: []string{"foo"},
Meta: map[string]string{"foo": "bar"},
Connect: &structs.ServiceConnect{
SidecarService: &structs.ServiceDefinition{},
},
},
wantNS: &structs.NodeService{
Kind: structs.ServiceKindConnectProxy,
ID: "web1-sidecar-proxy",
Service: "web-sidecar-proxy",
Port: 2222,
Tags: []string{"foo"},
Meta: map[string]string{"foo": "bar"},
LocallyRegisteredAsSidecar: true,
Proxy: structs.ConnectProxyConfig{
DestinationServiceName: "web",
DestinationServiceID: "web1",
LocalServiceAddress: "127.0.0.1",
LocalServicePort: 1111,
},
},
wantChecks: []*structs.CheckType{
&structs.CheckType{
Name: "Connect Sidecar Listening",
TCP: "127.0.0.1:2222",
Interval: 10 * time.Second,
},
&structs.CheckType{
Name: "Connect Sidecar Aliasing web1",
AliasService: "web1",
},
},
},
{
name: "invalid check type",
sd: &structs.ServiceDefinition{
Expand Down
5 changes: 5 additions & 0 deletions agent/structs/prepared_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ type ServiceQuery struct {
// service entry to be returned.
NodeMeta map[string]string

// ServiceMeta is a map of required service metadata fields. If a key/value
// pair is in this map it must be present on the node in order for the
// service entry to be returned.
ServiceMeta map[string]string

// Connect if true will filter the prepared query results to only
// include Connect-capable services. These include both native services
// and proxies for matching services. Note that if a proxy matches,
Expand Down
5 changes: 5 additions & 0 deletions api/prepared_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,11 @@ type ServiceQuery struct {
// service entry to be returned.
NodeMeta map[string]string

// ServiceMeta is a map of required service metadata fields. If a key/value
// pair is in this map it must be present on the node in order for the
// service entry to be returned.
ServiceMeta map[string]string

// Connect if true will filter the prepared query results to only
// include Connect-capable services. These include both native services
// and proxies for matching services. Note that if a proxy matches,
Expand Down
6 changes: 4 additions & 2 deletions api/prepared_query_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ func TestAPI_PreparedQuery(t *testing.T) {
ID: "redis1",
Service: "redis",
Tags: []string{"master", "v1"},
Meta: map[string]string{"redis-version": "4.0"},
Port: 8000,
},
}
Expand All @@ -43,8 +44,9 @@ func TestAPI_PreparedQuery(t *testing.T) {
def := &PreparedQueryDefinition{
Name: "test",
Service: ServiceQuery{
Service: "redis",
NodeMeta: map[string]string{"somekey": "somevalue"},
Service: "redis",
NodeMeta: map[string]string{"somekey": "somevalue"},
ServiceMeta: map[string]string{"redis-version": "4.0"},
},
}

Expand Down
10 changes: 8 additions & 2 deletions website/source/api/query.html.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,10 @@ The table below shows this endpoint's support for
key/value pairs that will be used for filtering the query results to nodes
with the given metadata values present.

- `ServiceMeta` `(map<string|string>: nil)` - Specifies a list of user-defined
key/value pairs that will be used for filtering the query results to services
with the given metadata values present.

- `Connect` `(bool: false)` - If true, only [Connect-capable](/docs/connect/index.html) services
for the specified service name will be returned. This includes both
natively integrated services and proxies. For proxies, the proxy name
Expand Down Expand Up @@ -263,7 +267,8 @@ The table below shows this endpoint's support for
"Near": "node1",
"OnlyPassing": false,
"Tags": ["primary", "!experimental"],
"NodeMeta": {"instance_type": "m3.large"}
"NodeMeta": {"instance_type": "m3.large"},
"ServiceMeta": {"environment": "production"}
},
"DNS": {
"TTL": "10s"
Expand Down Expand Up @@ -336,7 +341,8 @@ $ curl \
},
"OnlyPassing": false,
"Tags": ["primary", "!experimental"],
"NodeMeta": {"instance_type": "m3.large"}
"NodeMeta": {"instance_type": "m3.large"},
"ServiceMeta": {"environment": "production"}
},
"DNS": {
"TTL": "10s"
Expand Down
2 changes: 2 additions & 0 deletions website/source/docs/connect/proxies/sidecar-service.md
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ proxy.
be overridden as it is used to [manage the lifecycle](#lifecycle) of the
registration.
- `name` - Defaults to being `<parent-service-name>-sidecar-proxy`.
- `tags` - Defaults to the tags of the parent service.
- `meta` - Defaults to the service metadata of the parent service.
- `port` - Defaults to being auto-assigned from a [configurable
range](/docs/agent/options.html#sidecar_min_port) that is
by default `[21000, 21255]`.
Expand Down