From 3c5c911b690bcf67d3d97c0ddb0fd221da8d6202 Mon Sep 17 00:00:00 2001 From: Blake Rouse Date: Wed, 7 Oct 2020 18:52:51 -0500 Subject: [PATCH] [Elastic Agent] Reload fleet.kibana.hosts from policy change (#21599) * Update the connected client for kibana from policy change. * Fix vet. * Add changelog. * Add protocol compare. * Rollback protocol and hosts on failure. (cherry picked from commit 189171723ce69e3a235ef16a80c870db74fba682) --- x-pack/elastic-agent/CHANGELOG.next.asciidoc | 1 + .../pkg/agent/application/fleet_acker.go | 4 + .../pkg/agent/application/fleet_gateway.go | 4 + .../handler_action_policy_change.go | 107 +++++++++++++++++- .../handler_action_policy_change_test.go | 45 +++++++- .../pkg/agent/application/managed_mode.go | 16 ++- .../agent/application/managed_mode_test.go | 12 +- .../pkg/agent/storage/storage.go | 8 +- 8 files changed, 182 insertions(+), 15 deletions(-) diff --git a/x-pack/elastic-agent/CHANGELOG.next.asciidoc b/x-pack/elastic-agent/CHANGELOG.next.asciidoc index a0cd6f6291ed..73dfe8ff9a42 100644 --- a/x-pack/elastic-agent/CHANGELOG.next.asciidoc +++ b/x-pack/elastic-agent/CHANGELOG.next.asciidoc @@ -33,3 +33,4 @@ - Send updating state {pull}21461[21461] - Add `elastic.agent.id` and `elastic.agent.version` to published events from filebeat and metricbeat {pull}21543[21543] - Add `upgrade` subcommand to perform upgrade of installed Elastic Agent {pull}21425[21425] +- Update `fleet.yml` and Kibana hosts when a policy change updates the Kibana hosts {pull}21599[21599] diff --git a/x-pack/elastic-agent/pkg/agent/application/fleet_acker.go b/x-pack/elastic-agent/pkg/agent/application/fleet_acker.go index ba324f2d7de6..4544fa8a772f 100644 --- a/x-pack/elastic-agent/pkg/agent/application/fleet_acker.go +++ b/x-pack/elastic-agent/pkg/agent/application/fleet_acker.go @@ -39,6 +39,10 @@ func newActionAcker( }, nil } +func (f *actionAcker) SetClient(client clienter) { + f.client = client +} + func (f *actionAcker) Ack(ctx context.Context, action fleetapi.Action) error { // checkin agentID := f.agentInfo.AgentID() diff --git a/x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go b/x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go index 4bb9d2e6280a..21e7bfd45890 100644 --- a/x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go +++ b/x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go @@ -253,3 +253,7 @@ func (f *fleetGateway) stop() { close(f.done) f.wg.Wait() } + +func (f *fleetGateway) SetClient(client clienter) { + f.client = client +} diff --git a/x-pack/elastic-agent/pkg/agent/application/handler_action_policy_change.go b/x-pack/elastic-agent/pkg/agent/application/handler_action_policy_change.go index 81dc1444816d..8821700ee747 100644 --- a/x-pack/elastic-agent/pkg/agent/application/handler_action_policy_change.go +++ b/x-pack/elastic-agent/pkg/agent/application/handler_action_policy_change.go @@ -5,17 +5,36 @@ package application import ( + "bytes" "context" "fmt" + "io" + "sort" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info" + + "gopkg.in/yaml.v2" + + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/kibana" ) +type clientSetter interface { + SetClient(clienter) +} + type handlerPolicyChange struct { - log *logger.Logger - emitter emitterFunc + log *logger.Logger + emitter emitterFunc + agentInfo *info.AgentInfo + config *configuration.Configuration + store storage.Store + setters []clientSetter } func (h *handlerPolicyChange) Handle(ctx context.Context, a action, acker fleetAcker) error { @@ -31,9 +50,93 @@ func (h *handlerPolicyChange) Handle(ctx context.Context, a action, acker fleetA } h.log.Debugf("handlerPolicyChange: emit configuration for action %+v", a) + err = h.handleKibanaHosts(c) + if err != nil { + return err + } if err := h.emitter(c); err != nil { return err } return acker.Ack(ctx, action) } + +func (h *handlerPolicyChange) handleKibanaHosts(c *config.Config) (err error) { + cfg, err := configuration.NewFromConfig(c) + if err != nil { + return errors.New(err, "could not parse the configuration from the policy", errors.TypeConfig) + } + if kibanaEqual(h.config.Fleet.Kibana, cfg.Fleet.Kibana) { + // already the same hosts + return nil + } + + // only set protocol/hosts as that is all Fleet currently sends + prevProtocol := h.config.Fleet.Kibana.Protocol + prevHosts := h.config.Fleet.Kibana.Hosts + h.config.Fleet.Kibana.Protocol = cfg.Fleet.Kibana.Protocol + h.config.Fleet.Kibana.Hosts = cfg.Fleet.Kibana.Hosts + + // rollback on failure + defer func() { + if err != nil { + h.config.Fleet.Kibana.Protocol = prevProtocol + h.config.Fleet.Kibana.Hosts = prevHosts + } + }() + + client, err := fleetapi.NewAuthWithConfig(h.log, h.config.Fleet.AccessAPIKey, h.config.Fleet.Kibana) + if err != nil { + return errors.New( + err, "fail to create API client with updated hosts", + errors.TypeNetwork, errors.M("hosts", h.config.Fleet.Kibana.Hosts)) + } + reader, err := fleetToReader(h.agentInfo, h.config) + if err != nil { + return errors.New( + err, "fail to persist updated API client hosts", + errors.TypeUnexpected, errors.M("hosts", h.config.Fleet.Kibana.Hosts)) + } + err = h.store.Save(reader) + if err != nil { + return errors.New( + err, "fail to persist updated API client hosts", + errors.TypeFilesystem, errors.M("hosts", h.config.Fleet.Kibana.Hosts)) + } + for _, setter := range h.setters { + setter.SetClient(client) + } + return nil +} + +func kibanaEqual(k1 *kibana.Config, k2 *kibana.Config) bool { + if k1.Protocol != k2.Protocol { + return false + } + + sort.Strings(k1.Hosts) + sort.Strings(k2.Hosts) + if len(k1.Hosts) != len(k2.Hosts) { + return false + } + for i, v := range k1.Hosts { + if v != k2.Hosts[i] { + return false + } + } + return true +} + +func fleetToReader(agentInfo *info.AgentInfo, cfg *configuration.Configuration) (io.Reader, error) { + configToStore := map[string]interface{}{ + "fleet": cfg.Fleet, + "agent": map[string]interface{}{ + "id": agentInfo.AgentID(), + }, + } + data, err := yaml.Marshal(configToStore) + if err != nil { + return nil, err + } + return bytes.NewReader(data), nil +} diff --git a/x-pack/elastic-agent/pkg/agent/application/handler_action_policy_change_test.go b/x-pack/elastic-agent/pkg/agent/application/handler_action_policy_change_test.go index ce4802b68e68..c30f886b0d17 100644 --- a/x-pack/elastic-agent/pkg/agent/application/handler_action_policy_change_test.go +++ b/x-pack/elastic-agent/pkg/agent/application/handler_action_policy_change_test.go @@ -9,6 +9,10 @@ import ( "sync" "testing" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage" + "github.com/pkg/errors" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -31,6 +35,8 @@ func (m *mockEmitter) Emitter(policy *config.Config) error { func TestPolicyChange(t *testing.T) { log, _ := logger.New("") ack := newNoopAcker() + agentInfo, _ := info.NewAgentInfo() + nullStore := &storage.NullStore{} t.Run("Receive a config change and successfully emits a raw configuration", func(t *testing.T) { emitter := &mockEmitter{} @@ -42,7 +48,14 @@ func TestPolicyChange(t *testing.T) { Policy: conf, } - handler := &handlerPolicyChange{log: log, emitter: emitter.Emitter} + cfg := configuration.DefaultConfiguration() + handler := &handlerPolicyChange{ + log: log, + emitter: emitter.Emitter, + agentInfo: agentInfo, + config: cfg, + store: nullStore, + } err := handler.Handle(context.Background(), action, ack) require.NoError(t, err) @@ -60,7 +73,14 @@ func TestPolicyChange(t *testing.T) { Policy: conf, } - handler := &handlerPolicyChange{log: log, emitter: emitter.Emitter} + cfg := configuration.DefaultConfiguration() + handler := &handlerPolicyChange{ + log: log, + emitter: emitter.Emitter, + agentInfo: agentInfo, + config: cfg, + store: nullStore, + } err := handler.Handle(context.Background(), action, ack) require.Error(t, err) @@ -69,6 +89,9 @@ func TestPolicyChange(t *testing.T) { func TestPolicyAcked(t *testing.T) { log, _ := logger.New("") + agentInfo, _ := info.NewAgentInfo() + nullStore := &storage.NullStore{} + t.Run("Config change should not ACK on error", func(t *testing.T) { tacker := &testAcker{} @@ -83,7 +106,14 @@ func TestPolicyAcked(t *testing.T) { Policy: config, } - handler := &handlerPolicyChange{log: log, emitter: emitter.Emitter} + cfg := configuration.DefaultConfiguration() + handler := &handlerPolicyChange{ + log: log, + emitter: emitter.Emitter, + agentInfo: agentInfo, + config: cfg, + store: nullStore, + } err := handler.Handle(context.Background(), action, tacker) require.Error(t, err) @@ -105,7 +135,14 @@ func TestPolicyAcked(t *testing.T) { Policy: config, } - handler := &handlerPolicyChange{log: log, emitter: emitter.Emitter} + cfg := configuration.DefaultConfiguration() + handler := &handlerPolicyChange{ + log: log, + emitter: emitter.Emitter, + agentInfo: agentInfo, + config: cfg, + store: nullStore, + } err := handler.Handle(context.Background(), action, tacker) require.NoError(t, err) diff --git a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go index ea39bddd6c01..d3a04af7df46 100644 --- a/x-pack/elastic-agent/pkg/agent/application/managed_mode.go +++ b/x-pack/elastic-agent/pkg/agent/application/managed_mode.go @@ -213,12 +213,17 @@ func newManaged( acker, combinedReporter) + policyChanger := &handlerPolicyChange{ + log: log, + emitter: emit, + agentInfo: agentInfo, + config: cfg, + store: store, + setters: []clientSetter{acker}, + } actionDispatcher.MustRegister( &fleetapi.ActionPolicyChange{}, - &handlerPolicyChange{ - log: log, - emitter: emit, - }, + policyChanger, ) actionDispatcher.MustRegister( @@ -268,6 +273,9 @@ func newManaged( if err != nil { return nil, err } + // add the gateway to setters, so the gateway can be updated + // when the hosts for Kibana are updated by the policy. + policyChanger.setters = append(policyChanger.setters, gateway) managedApplication.gateway = gateway return managedApplication, nil diff --git a/x-pack/elastic-agent/pkg/agent/application/managed_mode_test.go b/x-pack/elastic-agent/pkg/agent/application/managed_mode_test.go index 65cb27547ff8..4325c5ce7a63 100644 --- a/x-pack/elastic-agent/pkg/agent/application/managed_mode_test.go +++ b/x-pack/elastic-agent/pkg/agent/application/managed_mode_test.go @@ -9,11 +9,14 @@ import ( "encoding/json" "testing" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest" + "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger" "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi" @@ -34,6 +37,7 @@ func TestManagedModeRouting(t *testing.T) { log, _ := logger.New("") router, _ := newRouter(log, streamFn) agentInfo, _ := info.NewAgentInfo() + nullStore := &storage.NullStore{} composableCtrl, _ := composable.New(log, nil) emit, err := emitter(ctx, log, agentInfo, composableCtrl, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}}) require.NoError(t, err) @@ -41,11 +45,15 @@ func TestManagedModeRouting(t *testing.T) { actionDispatcher, err := newActionDispatcher(ctx, log, &handlerDefault{log: log}) require.NoError(t, err) + cfg := configuration.DefaultConfiguration() actionDispatcher.MustRegister( &fleetapi.ActionPolicyChange{}, &handlerPolicyChange{ - log: log, - emitter: emit, + log: log, + emitter: emit, + agentInfo: agentInfo, + config: cfg, + store: nullStore, }, ) diff --git a/x-pack/elastic-agent/pkg/agent/storage/storage.go b/x-pack/elastic-agent/pkg/agent/storage/storage.go index b37c6e06ab34..2435311486a3 100644 --- a/x-pack/elastic-agent/pkg/agent/storage/storage.go +++ b/x-pack/elastic-agent/pkg/agent/storage/storage.go @@ -20,7 +20,9 @@ import ( const perms os.FileMode = 0600 -type store interface { +// Store saves the io.Reader. +type Store interface { + // Save the io.Reader. Save(io.Reader) error } @@ -62,12 +64,12 @@ type ReplaceOnSuccessStore struct { target string replaceWith []byte - wrapped store + wrapped Store } // NewReplaceOnSuccessStore takes a target file and a replacement content and will replace the target // file content if the wrapped store execution is done without any error. -func NewReplaceOnSuccessStore(target string, replaceWith []byte, wrapped store) *ReplaceOnSuccessStore { +func NewReplaceOnSuccessStore(target string, replaceWith []byte, wrapped Store) *ReplaceOnSuccessStore { return &ReplaceOnSuccessStore{ target: target, replaceWith: replaceWith,