Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Elastic Agent] Reload fleet.kibana.hosts from policy change #21599

Merged
merged 5 commits into from
Oct 7, 2020
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions x-pack/elastic-agent/CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@
- Send updating state {pull}21461[21461]
- Add `elastic.agent.id` and `elastic.agent.version` to published events from filebeat and metricbeat {pull}21543[21543]
- Add `upgrade` subcommand to perform upgrade of installed Elastic Agent {pull}21425[21425]
- Update `fleet.yml` and Kibana hosts when a policy change updates the Kibana hosts {pull}21599[21599]
4 changes: 4 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/fleet_acker.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ func newActionAcker(
}, nil
}

func (f *actionAcker) SetClient(client clienter) {
f.client = client
}

func (f *actionAcker) Ack(ctx context.Context, action fleetapi.Action) error {
// checkin
agentID := f.agentInfo.AgentID()
Expand Down
4 changes: 4 additions & 0 deletions x-pack/elastic-agent/pkg/agent/application/fleet_gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,3 +253,7 @@ func (f *fleetGateway) stop() {
close(f.done)
f.wg.Wait()
}

func (f *fleetGateway) SetClient(client clienter) {
f.client = client
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we should here and in acker make sure client is not change while performing execute/ack action.
at least it can be misleading e.g

  • performin a call using client1 - fails
  • client is updated
  • reporting failure from step 1 but client.URI is different now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I think this flow is correct, because each action from Fleet is handled synchronously in the Agent. So when a policy change comes in with an updated hosts, they action will not be Ack'd until this code is able to re-connect back to Kibana using the new hosts information.

fleetapi.NewAuthWithConfig ensures that the created client can communicate with Kibana, so that means the Ack will not happen until the updated client is created and set.

}
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,36 @@
package application

import (
"bytes"
"context"
"fmt"
"io"
"sort"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"

"gopkg.in/yaml.v2"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/kibana"
)

type clientSetter interface {
SetClient(clienter)
}

type handlerPolicyChange struct {
log *logger.Logger
emitter emitterFunc
log *logger.Logger
emitter emitterFunc
agentInfo *info.AgentInfo
config *configuration.Configuration
store storage.Store
setters []clientSetter
}

func (h *handlerPolicyChange) Handle(ctx context.Context, a action, acker fleetAcker) error {
Expand All @@ -31,9 +50,81 @@ func (h *handlerPolicyChange) Handle(ctx context.Context, a action, acker fleetA
}

h.log.Debugf("handlerPolicyChange: emit configuration for action %+v", a)
err = h.handleKibanaHosts(c)
if err != nil {
return err
}
if err := h.emitter(c); err != nil {
return err
}

return acker.Ack(ctx, action)
}

func (h *handlerPolicyChange) handleKibanaHosts(c *config.Config) error {
cfg, err := configuration.NewFromConfig(c)
if err != nil {
return errors.New(err, "could not parse the configuration from the policy", errors.TypeConfig)
}
if kibanaEqual(h.config.Fleet.Kibana, cfg.Fleet.Kibana) {
// already the same hosts
return nil
}
// only set protocol/hosts as that is all Fleet currently sends
h.config.Fleet.Kibana.Protocol = cfg.Fleet.Kibana.Protocol
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case client creation fails we will end up with updated values in memory which does not correspond to currently running client, we should probably do some rollback on error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call, I have added in rollback of the previous values on failure.

h.config.Fleet.Kibana.Hosts = cfg.Fleet.Kibana.Hosts
client, err := fleetapi.NewAuthWithConfig(h.log, h.config.Fleet.AccessAPIKey, h.config.Fleet.Kibana)
if err != nil {
return errors.New(
err, "fail to create API client with updated hosts",
errors.TypeNetwork, errors.M("hosts", h.config.Fleet.Kibana.Hosts))
}
reader, err := fleetToReader(h.agentInfo, h.config)
if err != nil {
return errors.New(
err, "fail to persist updated API client hosts",
errors.TypeUnexpected, errors.M("hosts", h.config.Fleet.Kibana.Hosts))
}
err = h.store.Save(reader)
if err != nil {
return errors.New(
err, "fail to persist updated API client hosts",
errors.TypeFilesystem, errors.M("hosts", h.config.Fleet.Kibana.Hosts))
}
for _, setter := range h.setters {
setter.SetClient(client)
}
return nil
}

func kibanaEqual(k1 *kibana.Config, k2 *kibana.Config) bool {
if k1.Protocol != k2.Protocol {
return false
}

sort.Strings(k1.Hosts)
sort.Strings(k2.Hosts)
if len(k1.Hosts) != len(k2.Hosts) {
return false
}
for i, v := range k1.Hosts {
if v != k2.Hosts[i] {
return false
}
}
return true
}

func fleetToReader(agentInfo *info.AgentInfo, cfg *configuration.Configuration) (io.Reader, error) {
configToStore := map[string]interface{}{
"fleet": cfg.Fleet,
"agent": map[string]interface{}{
"id": agentInfo.AgentID(),
},
}
data, err := yaml.Marshal(configToStore)
if err != nil {
return nil, err
}
return bytes.NewReader(data), nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ import (
"sync"
"testing"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand All @@ -31,6 +35,8 @@ func (m *mockEmitter) Emitter(policy *config.Config) error {
func TestPolicyChange(t *testing.T) {
log, _ := logger.New("")
ack := newNoopAcker()
agentInfo, _ := info.NewAgentInfo()
nullStore := &storage.NullStore{}

t.Run("Receive a config change and successfully emits a raw configuration", func(t *testing.T) {
emitter := &mockEmitter{}
Expand All @@ -42,7 +48,14 @@ func TestPolicyChange(t *testing.T) {
Policy: conf,
}

handler := &handlerPolicyChange{log: log, emitter: emitter.Emitter}
cfg := configuration.DefaultConfiguration()
handler := &handlerPolicyChange{
log: log,
emitter: emitter.Emitter,
agentInfo: agentInfo,
config: cfg,
store: nullStore,
}

err := handler.Handle(context.Background(), action, ack)
require.NoError(t, err)
Expand All @@ -60,7 +73,14 @@ func TestPolicyChange(t *testing.T) {
Policy: conf,
}

handler := &handlerPolicyChange{log: log, emitter: emitter.Emitter}
cfg := configuration.DefaultConfiguration()
handler := &handlerPolicyChange{
log: log,
emitter: emitter.Emitter,
agentInfo: agentInfo,
config: cfg,
store: nullStore,
}

err := handler.Handle(context.Background(), action, ack)
require.Error(t, err)
Expand All @@ -69,6 +89,9 @@ func TestPolicyChange(t *testing.T) {

func TestPolicyAcked(t *testing.T) {
log, _ := logger.New("")
agentInfo, _ := info.NewAgentInfo()
nullStore := &storage.NullStore{}

t.Run("Config change should not ACK on error", func(t *testing.T) {
tacker := &testAcker{}

Expand All @@ -83,7 +106,14 @@ func TestPolicyAcked(t *testing.T) {
Policy: config,
}

handler := &handlerPolicyChange{log: log, emitter: emitter.Emitter}
cfg := configuration.DefaultConfiguration()
handler := &handlerPolicyChange{
log: log,
emitter: emitter.Emitter,
agentInfo: agentInfo,
config: cfg,
store: nullStore,
}

err := handler.Handle(context.Background(), action, tacker)
require.Error(t, err)
Expand All @@ -105,7 +135,14 @@ func TestPolicyAcked(t *testing.T) {
Policy: config,
}

handler := &handlerPolicyChange{log: log, emitter: emitter.Emitter}
cfg := configuration.DefaultConfiguration()
handler := &handlerPolicyChange{
log: log,
emitter: emitter.Emitter,
agentInfo: agentInfo,
config: cfg,
store: nullStore,
}

err := handler.Handle(context.Background(), action, tacker)
require.NoError(t, err)
Expand Down
16 changes: 12 additions & 4 deletions x-pack/elastic-agent/pkg/agent/application/managed_mode.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,12 +209,17 @@ func newManaged(
acker,
combinedReporter)

policyChanger := &handlerPolicyChange{
log: log,
emitter: emit,
agentInfo: agentInfo,
config: cfg,
store: store,
setters: []clientSetter{acker},
}
actionDispatcher.MustRegister(
&fleetapi.ActionPolicyChange{},
&handlerPolicyChange{
log: log,
emitter: emit,
},
policyChanger,
)

actionDispatcher.MustRegister(
Expand Down Expand Up @@ -264,6 +269,9 @@ func newManaged(
if err != nil {
return nil, err
}
// add the gateway to setters, so the gateway can be updated
// when the hosts for Kibana are updated by the policy.
policyChanger.setters = append(policyChanger.setters, gateway)

managedApplication.gateway = gateway
return managedApplication, nil
Expand Down
12 changes: 10 additions & 2 deletions x-pack/elastic-agent/pkg/agent/application/managed_mode_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,14 @@ import (
"encoding/json"
"testing"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/application/info"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configrequest"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/storage"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/fleetapi"
Expand All @@ -34,18 +37,23 @@ func TestManagedModeRouting(t *testing.T) {
log, _ := logger.New("")
router, _ := newRouter(log, streamFn)
agentInfo, _ := info.NewAgentInfo()
nullStore := &storage.NullStore{}
composableCtrl, _ := composable.New(log, nil)
emit, err := emitter(ctx, log, agentInfo, composableCtrl, router, &configModifiers{Decorators: []decoratorFunc{injectMonitoring}})
require.NoError(t, err)

actionDispatcher, err := newActionDispatcher(ctx, log, &handlerDefault{log: log})
require.NoError(t, err)

cfg := configuration.DefaultConfiguration()
actionDispatcher.MustRegister(
&fleetapi.ActionPolicyChange{},
&handlerPolicyChange{
log: log,
emitter: emit,
log: log,
emitter: emit,
agentInfo: agentInfo,
config: cfg,
store: nullStore,
},
)

Expand Down
8 changes: 5 additions & 3 deletions x-pack/elastic-agent/pkg/agent/storage/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@ import (

const perms os.FileMode = 0600

type store interface {
// Store saves the io.Reader.
type Store interface {
// Save the io.Reader.
Save(io.Reader) error
}

Expand Down Expand Up @@ -62,12 +64,12 @@ type ReplaceOnSuccessStore struct {
target string
replaceWith []byte

wrapped store
wrapped Store
}

// NewReplaceOnSuccessStore takes a target file and a replacement content and will replace the target
// file content if the wrapped store execution is done without any error.
func NewReplaceOnSuccessStore(target string, replaceWith []byte, wrapped store) *ReplaceOnSuccessStore {
func NewReplaceOnSuccessStore(target string, replaceWith []byte, wrapped Store) *ReplaceOnSuccessStore {
return &ReplaceOnSuccessStore{
target: target,
replaceWith: replaceWith,
Expand Down