From e98b49261fb932216997199ec4d665cf35776763 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Wed, 24 Nov 2021 18:55:20 +0100 Subject: [PATCH] Skip config check in autodiscover for duplicated configurations (#29048) (#29119) If the configuration is already running, it has been already checked, don't try to check it again to avoid problems with configuration checks that fail if some resource already exist with the same identifiers. (cherry picked from commit 1207d635d4636412652e52d457af96091b8336a2) Co-authored-by: Jaime Soriano Pastor --- CHANGELOG.next.asciidoc | 1 + libbeat/autodiscover/autodiscover.go | 39 +++++++++----- libbeat/autodiscover/autodiscover_test.go | 64 +++++++++++++++++++++++ 3 files changed, 91 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 0c4a4883785c..c9bac82239ea 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -153,6 +153,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - Fix discovery of Nomad allocations with multiple events during startup. {pull}28700[28700] - Fix `fingerprint` processor to give it access to the `@timestamp` field. {issue}28683[28683] - Fix the wrong beat name on monitoring and state endpoint {issue}27755[27755] +- Skip configuration checks in autodiscover for configurations that are already running {pull}29048[29048] *Auditbeat* diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index 8c6977e8362b..e1f1d8d2bfcd 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -64,6 +64,10 @@ type Autodiscover struct { meta *meta.Map listener bus.Listener logger *logp.Logger + + // workDone is a channel used for testing purpouses, to know when the worker has + // done some work. + workDone chan struct{} } // NewAutodiscover instantiates and returns a new Autodiscover manager @@ -165,6 +169,11 @@ func (a *Autodiscover) worker() { // reset updated status updated = false } + + // For testing purpouses. + if a.workDone != nil { + a.workDone <- struct{}{} + } } } @@ -207,26 +216,30 @@ func (a *Autodiscover) handleStart(event bus.Event) bool { continue } - err = a.factory.CheckConfig(config) - if err != nil { - a.logger.Error(errors.Wrap(err, fmt.Sprintf( - "Auto discover config check failed for config '%s', won't start runner", - common.DebugString(config, true)))) - continue - } - // Update meta no matter what dynFields := a.meta.Store(hash, meta) + if _, ok := newCfg[hash]; ok { + a.logger.Debugf("Config %v duplicated in start event", common.DebugString(config, true)) + continue + } + if cfg, ok := a.configs[eventID][hash]; ok { a.logger.Debugf("Config %v is already running", common.DebugString(config, true)) newCfg[hash] = cfg continue - } else { - newCfg[hash] = &reload.ConfigWithMeta{ - Config: config, - Meta: &dynFields, - } + } + + err = a.factory.CheckConfig(config) + if err != nil { + a.logger.Error(errors.Wrap(err, fmt.Sprintf( + "Auto discover config check failed for config '%s', won't start runner", + common.DebugString(config, true)))) + continue + } + newCfg[hash] = &reload.ConfigWithMeta{ + Config: config, + Meta: &dynFields, } updated = true diff --git a/libbeat/autodiscover/autodiscover_test.go b/libbeat/autodiscover/autodiscover_test.go index 4b2ecfef1283..2d0ea26b6897 100644 --- a/libbeat/autodiscover/autodiscover_test.go +++ b/libbeat/autodiscover/autodiscover_test.go @@ -75,6 +75,8 @@ type mockAdapter struct { mutex sync.Mutex configs []*common.Config runners []*mockRunner + + CheckConfigCallCount int } // CreateConfig generates a valid list of configs from the given event, the received event will have all keys defined by `StartFilter` @@ -87,6 +89,8 @@ func (m *mockAdapter) CreateConfig(event bus.Event) ([]*common.Config, error) { // CheckConfig tests given config to check if it will work or not, returns errors in case it won't work func (m *mockAdapter) CheckConfig(c *common.Config) error { + m.CheckConfigCallCount++ + config := struct { Broken bool `config:"broken"` }{} @@ -324,6 +328,66 @@ func TestAutodiscoverHash(t *testing.T) { assert.False(t, runners[1].stopped) } +func TestAutodiscoverDuplicatedConfigConfigCheckCalledOnce(t *testing.T) { + goroutines := resources.NewGoroutinesChecker() + defer goroutines.Check(t) + + // Register mock autodiscover provider + busChan := make(chan bus.Bus, 1) + + Registry = NewRegistry() + Registry.AddProvider("mock", func(beatName string, b bus.Bus, uuid uuid.UUID, c *common.Config, k keystore.Keystore) (Provider, error) { + // intercept bus to mock events + busChan <- b + + return &mockProvider{}, nil + }) + + // Create a mock adapter that returns a duplicated config + runnerConfig, _ := common.NewConfigFrom(map[string]string{ + "id": "foo", + }) + adapter := mockAdapter{ + configs: []*common.Config{runnerConfig, runnerConfig}, + } + + // and settings: + providerConfig, _ := common.NewConfigFrom(map[string]string{ + "type": "mock", + }) + config := Config{ + Providers: []*common.Config{providerConfig}, + } + k, _ := keystore.NewFileKeystore("test") + // Create autodiscover manager + autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config, k) + if err != nil { + t.Fatal(err) + } + + autodiscover.workDone = make(chan struct{}) + + // Start it + autodiscover.Start() + defer autodiscover.Stop() + eventBus := <-busChan + + // Publish a couple of events. + for i := 0; i < 2; i++ { + eventBus.Publish(bus.Event{ + "id": "foo", + "provider": "mock", + "start": true, + "meta": common.MapStr{ + "foo": "bar", + }, + }) + <-autodiscover.workDone + assert.Equal(t, 1, len(adapter.Runners()), "Only one runner should be started") + assert.Equal(t, 1, adapter.CheckConfigCallCount, "Check config should have been called only once") + } +} + func TestAutodiscoverWithConfigCheckFailures(t *testing.T) { goroutines := resources.NewGoroutinesChecker() defer goroutines.Check(t)