Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

server: ensure that central service config flattening properly resets the state each time #10239

Merged
merged 2 commits into from
May 14, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changelog/10239.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
```release-note:bug
server: ensure that central service config flattening properly resets the state each time
```
47 changes: 26 additions & 21 deletions agent/consul/config_endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,8 +323,9 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
&args.QueryOptions,
&reply.QueryMeta,
func(ws memdb.WatchSet, state *state.Store) error {
reply.Reset()
reply.MeshGateway.Mode = structs.MeshGatewayModeDefault
var thisReply structs.ServiceConfigResponse

thisReply.MeshGateway.Mode = structs.MeshGatewayModeDefault
// TODO(freddy) Refactor this into smaller set of state store functions
// Pass the WatchSet to both the service and proxy config lookups. If either is updated during the
// blocking query, this function will be rerun and these state store lookups will both be current.
Expand All @@ -349,11 +350,11 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
if err != nil {
return fmt.Errorf("failed to copy global proxy-defaults: %v", err)
}
reply.ProxyConfig = mapCopy.(map[string]interface{})
reply.Mode = proxyConf.Mode
reply.TransparentProxy = proxyConf.TransparentProxy
reply.MeshGateway = proxyConf.MeshGateway
reply.Expose = proxyConf.Expose
thisReply.ProxyConfig = mapCopy.(map[string]interface{})
thisReply.Mode = proxyConf.Mode
thisReply.TransparentProxy = proxyConf.TransparentProxy
thisReply.MeshGateway = proxyConf.MeshGateway
thisReply.Expose = proxyConf.Expose

// Extract the global protocol from proxyConf for upstream configs.
rawProtocol := proxyConf.Config["protocol"]
Expand All @@ -369,7 +370,7 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
if err != nil {
return err
}
reply.Index = index
thisReply.Index = index

var serviceConf *structs.ServiceConfigEntry
if serviceEntry != nil {
Expand All @@ -378,25 +379,25 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r
return fmt.Errorf("invalid service config type %T", serviceEntry)
}
if serviceConf.Expose.Checks {
reply.Expose.Checks = true
thisReply.Expose.Checks = true
}
if len(serviceConf.Expose.Paths) >= 1 {
reply.Expose.Paths = serviceConf.Expose.Paths
thisReply.Expose.Paths = serviceConf.Expose.Paths
}
if serviceConf.MeshGateway.Mode != structs.MeshGatewayModeDefault {
reply.MeshGateway.Mode = serviceConf.MeshGateway.Mode
thisReply.MeshGateway.Mode = serviceConf.MeshGateway.Mode
}
if serviceConf.Protocol != "" {
if reply.ProxyConfig == nil {
reply.ProxyConfig = make(map[string]interface{})
if thisReply.ProxyConfig == nil {
thisReply.ProxyConfig = make(map[string]interface{})
}
reply.ProxyConfig["protocol"] = serviceConf.Protocol
thisReply.ProxyConfig["protocol"] = serviceConf.Protocol
}
if serviceConf.TransparentProxy.OutboundListenerPort != 0 {
reply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort
thisReply.TransparentProxy.OutboundListenerPort = serviceConf.TransparentProxy.OutboundListenerPort
}
if serviceConf.Mode != structs.ProxyModeDefault {
reply.Mode = serviceConf.Mode
thisReply.Mode = serviceConf.Mode
}
}

Expand All @@ -414,13 +415,14 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r

// Check the args and the resolved value. If it was exclusively set via a config entry, then args.Mode
// will never be transparent because the service config request does not use the resolved value.
tproxy = args.Mode == structs.ProxyModeTransparent || reply.Mode == structs.ProxyModeTransparent
tproxy = args.Mode == structs.ProxyModeTransparent || thisReply.Mode == structs.ProxyModeTransparent
)

// The upstreams passed as arguments to this endpoint are the upstreams explicitly defined in a proxy registration.
// If no upstreams were passed, then we should only returned the resolved config if the proxy in transparent mode.
// Otherwise we would return a resolved upstream config to a proxy with no configured upstreams.
if noUpstreamArgs && !tproxy {
*reply = thisReply
return nil
}

Expand Down Expand Up @@ -534,25 +536,28 @@ func (c *ConfigEntry) ResolveServiceConfig(args *structs.ServiceConfigRequest, r

// don't allocate the slices just to not fill them
if len(usConfigs) == 0 {
*reply = thisReply
return nil
}

if legacyUpstreams {
// For legacy upstreams we return a map that is only keyed on the string ID, since they precede namespaces
reply.UpstreamConfigs = make(map[string]map[string]interface{})
thisReply.UpstreamConfigs = make(map[string]map[string]interface{})

for us, conf := range usConfigs {
reply.UpstreamConfigs[us.ID] = conf
thisReply.UpstreamConfigs[us.ID] = conf
}

} else {
reply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs))
thisReply.UpstreamIDConfigs = make(structs.OpaqueUpstreamConfigs, 0, len(usConfigs))

for us, conf := range usConfigs {
reply.UpstreamIDConfigs = append(reply.UpstreamIDConfigs,
thisReply.UpstreamIDConfigs = append(thisReply.UpstreamIDConfigs,
structs.OpaqueUpstreamConfig{Upstream: us, Config: conf})
}
}

*reply = thisReply
return nil
})
}
Expand Down
209 changes: 209 additions & 0 deletions agent/consul/config_endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1420,6 +1420,9 @@ func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) {
// of the blocking query does NOT bleed over into the next run. Concretely
// in this test the data present in the initial proxy-defaults should not
// be present when we are woken up due to proxy-defaults being deleted.
//
// This test does not pertain to upstreams, see:
// TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking

state := s1.fsm.State()
require.NoError(state.EnsureConfigEntry(1, &structs.ProxyConfigEntry{
Expand Down Expand Up @@ -1571,6 +1574,205 @@ func TestConfigEntry_ResolveServiceConfig_Blocking(t *testing.T) {
}
}

func TestConfigEntry_ResolveServiceConfig_Upstreams_Blocking(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
}

t.Parallel()

dir1, s1 := testServer(t)
defer os.RemoveAll(dir1)
defer s1.Shutdown()
codec := rpcClient(t, s1)
defer codec.Close()

// The main thing this should test is that information from one iteration
// of the blocking query does NOT bleed over into the next run. Concretely
// in this test the data present in the initial proxy-defaults should not
// be present when we are woken up due to proxy-defaults being deleted.
//
// This test is about fields in upstreams, see:
// TestConfigEntry_ResolveServiceConfig_Blocking

state := s1.fsm.State()
require.NoError(t, state.EnsureConfigEntry(1, &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "foo",
Protocol: "http",
}))
require.NoError(t, state.EnsureConfigEntry(2, &structs.ServiceConfigEntry{
Kind: structs.ServiceDefaults,
Name: "bar",
Protocol: "http",
}))

var index uint64

runStep(t, "foo and bar should be both http", func(t *testing.T) {
// Verify that we get the results of service-defaults for 'foo' and 'bar'.
var out structs.ServiceConfigResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
&structs.ServiceConfigRequest{
Name: "foo",
Datacenter: "dc1",
UpstreamIDs: []structs.ServiceID{
structs.NewServiceID("bar", nil),
structs.NewServiceID("other", nil),
},
},
&out,
))

expected := structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{
"protocol": "http",
},
UpstreamIDConfigs: []structs.OpaqueUpstreamConfig{
{
Upstream: structs.NewServiceID("bar", nil),
Config: map[string]interface{}{
"protocol": "http",
},
},
},
QueryMeta: out.QueryMeta, // don't care
}

require.Equal(t, expected, out)
index = out.Index
})

runStep(t, "blocking query for foo wakes on bar entry delete", func(t *testing.T) {
// Now setup a blocking query for 'foo' while we erase the
// service-defaults for bar.

// Async cause a change
start := time.Now()
go func() {
time.Sleep(100 * time.Millisecond)
err := state.DeleteConfigEntry(index+1,
structs.ServiceDefaults,
"bar",
nil,
)
if err != nil {
t.Errorf("delete config entry failed: %v", err)
}
}()

// Re-run the query
var out structs.ServiceConfigResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
&structs.ServiceConfigRequest{
Name: "foo",
Datacenter: "dc1",
UpstreamIDs: []structs.ServiceID{
structs.NewServiceID("bar", nil),
structs.NewServiceID("other", nil),
},
QueryOptions: structs.QueryOptions{
MinQueryIndex: index,
MaxQueryTime: time.Second,
},
},
&out,
))

// Should block at least 100ms
require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast")

// Check the indexes
require.Equal(t, out.Index, index+1)

expected := structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{
"protocol": "http",
},
QueryMeta: out.QueryMeta, // don't care
}

require.Equal(t, expected, out)
index = out.Index
})

runStep(t, "foo should be http and bar should be unset", func(t *testing.T) {
// Verify that we get the results of service-defaults for just 'foo'.
var out structs.ServiceConfigResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
&structs.ServiceConfigRequest{
Name: "foo",
Datacenter: "dc1",
UpstreamIDs: []structs.ServiceID{
structs.NewServiceID("bar", nil),
structs.NewServiceID("other", nil),
},
},
&out,
))

expected := structs.ServiceConfigResponse{
ProxyConfig: map[string]interface{}{
"protocol": "http",
},
QueryMeta: out.QueryMeta, // don't care
}

require.Equal(t, expected, out)
index = out.Index
})

runStep(t, "blocking query for foo wakes on foo entry delete", func(t *testing.T) {
// Now setup a blocking query for 'foo' while we erase the
// service-defaults for foo.

// Async cause a change
start := time.Now()
go func() {
time.Sleep(100 * time.Millisecond)
err := state.DeleteConfigEntry(index+1,
structs.ServiceDefaults,
"foo",
nil,
)
if err != nil {
t.Errorf("delete config entry failed: %v", err)
}
}()

// Re-run the query
var out structs.ServiceConfigResponse
require.NoError(t, msgpackrpc.CallWithCodec(codec, "ConfigEntry.ResolveServiceConfig",
&structs.ServiceConfigRequest{
Name: "foo",
Datacenter: "dc1",
UpstreamIDs: []structs.ServiceID{
structs.NewServiceID("bar", nil),
structs.NewServiceID("other", nil),
},
QueryOptions: structs.QueryOptions{
MinQueryIndex: index,
MaxQueryTime: time.Second,
},
},
&out,
))

// Should block at least 100ms
require.True(t, time.Since(start) >= 100*time.Millisecond, "too fast")

// Check the indexes
require.Equal(t, out.Index, index+1)

expected := structs.ServiceConfigResponse{
QueryMeta: out.QueryMeta, // don't care
}

require.Equal(t, expected, out)
index = out.Index
})
}

func TestConfigEntry_ResolveServiceConfig_UpstreamProxyDefaultsProtocol(t *testing.T) {
if testing.Short() {
t.Skip("too slow for testing.Short")
Expand Down Expand Up @@ -1848,3 +2050,10 @@ func TestConfigEntry_ProxyDefaultsExposeConfig(t *testing.T) {
require.True(t, ok)
require.Equal(t, expose, proxyConf.Expose)
}

func runStep(t *testing.T, name string, fn func(t *testing.T)) {
t.Helper()
if !t.Run(name, fn) {
t.FailNow()
}
}
6 changes: 0 additions & 6 deletions agent/structs/config_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -966,12 +966,6 @@ type ServiceConfigResponse struct {
QueryMeta
}

func (r *ServiceConfigResponse) Reset() {
r.ProxyConfig = nil
r.UpstreamConfigs = nil
r.MeshGateway = MeshGatewayConfig{}
}

// MarshalBinary writes ServiceConfigResponse as msgpack encoded. It's only here
// because we need custom decoding of the raw interface{} values.
func (r *ServiceConfigResponse) MarshalBinary() (data []byte, err error) {
Expand Down