diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index edcacb2c9c5a..c8d3140dbfca 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -60,6 +60,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Fix concurrent map writes when system/process code called from reporter code {pull}32491[32491] - Fix panics when a processor is closed twice {pull}34647[34647] - Update elastic-agent-system-metrics to v0.4.6 to allow builds on mips platforms. {pull}34674[34674] +- Fix Beats started by agent do not respect the allow_older_versions: true configuration flag {issue}34227[34227] {pull}34964[34964] *Auditbeat* diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 7d537c97db3a..57064b2e7089 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -329,7 +329,10 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) { logSystemInfo(b.Info) logp.Info("Setup Beat: %s; Version: %s", b.Info.Beat, b.Info.Version) - b.checkElasticsearchVersion() + err = b.registerESVersionCheckCallback() + if err != nil { + return nil, err + } err = b.registerESIndexManagement() if err != nil { @@ -981,15 +984,18 @@ func (b *Beat) loadDashboards(ctx context.Context, force bool) error { return nil } -// checkElasticsearchVersion registers a global callback to make sure ES instance we are connecting +// registerESVersionCheckCallback registers a global callback to make sure ES instance we are connecting // to is at least on the same version as the Beat. // If the check is disabled or the output is not Elasticsearch, nothing happens. -func (b *Beat) checkElasticsearchVersion() { - if b.isConnectionToOlderVersionAllowed() { - return - } +func (b *Beat) registerESVersionCheckCallback() error { + _, err := elasticsearch.RegisterGlobalCallback(func(conn *eslegclient.Connection) error { + if !isElasticsearchOutput(b.Config.Output.Name()) { + return errors.New("Elasticsearch output is not configured") + } + if b.isConnectionToOlderVersionAllowed() { + return nil + } - _, _ = elasticsearch.RegisterGlobalCallback(func(conn *eslegclient.Connection) error { esVersion := conn.GetVersion() beatVersion, err := libversion.New(b.Info.Version) if err != nil { @@ -1000,6 +1006,8 @@ func (b *Beat) checkElasticsearchVersion() { } return nil }) + + return err } func (b *Beat) isConnectionToOlderVersionAllowed() bool { @@ -1035,13 +1043,28 @@ func (b *Beat) indexSetupCallback() elasticsearch.ConnectCallback { } func (b *Beat) makeOutputReloader(outReloader pipeline.OutputReloader) reload.Reloadable { - return reload.ReloadableFunc(func(config *reload.ConfigWithMeta) error { + return reload.ReloadableFunc(func(update *reload.ConfigWithMeta) error { + if update == nil { + return nil + } + if b.OutputConfigReloader != nil { - if err := b.OutputConfigReloader.Reload(config); err != nil { + if err := b.OutputConfigReloader.Reload(update); err != nil { return err } } - return outReloader.Reload(config, b.createOutput) + + // we need to update the output configuration because + // some callbacks are relying on it to be up to date. + // e.g. the Elasticsearch version validation + if update.Config != nil { + err := b.Config.Output.Unpack(update.Config) + if err != nil { + return err + } + } + + return outReloader.Reload(update, b.createOutput) }) } diff --git a/libbeat/cmd/instance/beat_test.go b/libbeat/cmd/instance/beat_test.go index 3737b5099647..6520767d5502 100644 --- a/libbeat/cmd/instance/beat_test.go +++ b/libbeat/cmd/instance/beat_test.go @@ -26,6 +26,9 @@ import ( "testing" "github.com/elastic/beats/v7/libbeat/cfgfile" + "github.com/elastic/beats/v7/libbeat/common/reload" + "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/elastic-agent-libs/config" "github.com/gofrs/uuid" "github.com/stretchr/testify/assert" @@ -224,3 +227,46 @@ func TestSanitizeIPs(t *testing.T) { }) } } + +func TestReloader(t *testing.T) { + t.Run("updates the output configuration on the beat", func(t *testing.T) { + b, err := NewBeat("testbeat", "testidx", "0.9", false) + require.NoError(t, err) + + cfg := ` +elasticsearch: + hosts: ["https://127.0.0.1:9200"] + username: "elastic" + allow_older_versions: true +` + c, err := config.NewConfigWithYAML([]byte(cfg), cfg) + require.NoError(t, err) + outCfg, err := c.Child("elasticsearch", -1) + require.NoError(t, err) + + update := &reload.ConfigWithMeta{Config: c} + m := &outputReloaderMock{} + reloader := b.makeOutputReloader(m) + + require.False(t, b.Config.Output.IsSet(), "the output should not be set yet") + require.False(t, b.isConnectionToOlderVersionAllowed(), "the flag should not be present in the empty configuration") + err = reloader.Reload(update) + require.NoError(t, err) + require.True(t, b.Config.Output.IsSet(), "now the output should be set") + require.Equal(t, outCfg, b.Config.Output.Config()) + require.Same(t, c, m.cfg.Config) + require.True(t, b.isConnectionToOlderVersionAllowed(), "the flag should be present") + }) +} + +type outputReloaderMock struct { + cfg *reload.ConfigWithMeta +} + +func (r *outputReloaderMock) Reload( + cfg *reload.ConfigWithMeta, + factory func(o outputs.Observer, cfg config.Namespace) (outputs.Group, error), +) error { + r.cfg = cfg + return nil +}