From 32f04e35646390280b077d688ecbc29ebe7b9d47 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Mon, 11 Nov 2024 15:29:47 +0100 Subject: [PATCH 1/6] filebeat: input v2 compat uses random ID for CheckConfig The CheckConfig function validates a configuration by creating and immediately discarding an input. However, a potential conflict arises when CheckConfig is used with autodiscover in Kubernetes. Autodiscover accumulates configuration changes and applies them in batches. This can be problematic if a stop event for a pod is closely followed by a start event for the same pod (e.g., during a pod restart) before the inputs are reloaded. In this scenario, autodiscover might attempt to validate the configuration for the start event while the input for the pod is already running. This would lead to filestream input manager to see two inputs with the same ID, triggering a log warning. Although this situation generates a warning, it doesn't result in data duplication. As the second input is only created to validate the configuration and later discarded. Also the reload process will ensure only new inputs are created, any input already running won't be duplicated. --- CHANGELOG.next.asciidoc | 1 + filebeat/input/v2/compat/compat.go | 33 ++++++- filebeat/input/v2/compat/compat_test.go | 109 ++++++++++++++++++++++++ 3 files changed, 141 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 7d41f55bb162..70b84a843d48 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -48,6 +48,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Change log.file.path field in awscloudwatch input to nested object. {pull}41099[41099] - Remove deprecated awscloudwatch field from Filebeat. {pull}41089[41089] - The performance of ingesting SQS data with the S3 input has improved by up to 60x for queues with many small events. `max_number_of_messages` config for SQS mode is now ignored, as the new design no longer needs a manual cap on messages. Instead, use `number_of_workers` to scale ingestion rate in both S3 and SQS modes. The increased efficiency may increase network bandwidth consumption, which can be throttled by lowering `number_of_workers`. It may also increase number of events stored in memory, which can be throttled by lowering the configured size of the internal queue. {pull}40699[40699] +- Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover, - Add kafka compression support for ZSTD. diff --git a/filebeat/input/v2/compat/compat.go b/filebeat/input/v2/compat/compat.go index 416b6628e192..826c05c30596 100644 --- a/filebeat/input/v2/compat/compat.go +++ b/filebeat/input/v2/compat/compat.go @@ -24,6 +24,8 @@ import ( "context" "errors" "fmt" + "math/rand" // using for better performance + "strconv" "sync" "github.com/mitchellh/hashstructure" @@ -73,12 +75,18 @@ func RunnerFactory( } func (f *factory) CheckConfig(cfg *conf.C) error { - _, err := f.loader.Configure(cfg) + // just check the config, therefore to avoid potential side effects (ID duplication) + // change the ID. + checkCfg, err := f.generateCheckConfig(cfg) + if err != nil { + f.log.Warnw(fmt.Sprintf("input V2 factory.CheckConfig failed to clone config before checking it. Original config will be checked, it might trigger an input duplication warning: %v", err), "original_config", conf.DebugString(cfg, true)) + } + _, err = f.loader.Configure(checkCfg) if err != nil { return fmt.Errorf("runner factory could not check config: %w", err) } - if err = f.loader.Delete(cfg); err != nil { + if err = f.loader.Delete(checkCfg); err != nil { return fmt.Errorf( "runner factory failed to delete an input after config check: %w", err) @@ -176,3 +184,24 @@ func configID(config *conf.C) (string, error) { return fmt.Sprintf("%16X", id), nil } + +func (f *factory) generateCheckConfig(config *conf.C) (*conf.C, error) { + testCfg, err := conf.NewConfigFrom(config) + if err != nil { + return config, fmt.Errorf("failed to create new config: %w", err) + } + + // let's try to override the `inputID` field, if it fails, give up + inputID, err := testCfg.String("inputID", -1) + if err != nil { + return config, fmt.Errorf("failed to get 'inputID': %w", err) + } + + // using math/rand for performance, generate a 0-9 string + err = testCfg.SetString("inputID", -1, inputID+strconv.Itoa(rand.Intn(10))) + if err != nil { + return config, fmt.Errorf("failed to set 'inputID': %w", err) + } + + return testCfg, nil +} diff --git a/filebeat/input/v2/compat/compat_test.go b/filebeat/input/v2/compat/compat_test.go index c5092583c0e0..0c52383b7812 100644 --- a/filebeat/input/v2/compat/compat_test.go +++ b/filebeat/input/v2/compat/compat_test.go @@ -18,6 +18,8 @@ package compat import ( + "errors" + "fmt" "sync" "testing" @@ -62,6 +64,72 @@ func TestRunnerFactory_CheckConfig(t *testing.T) { assert.Equal(t, 0, countRun) }) + t.Run("does not cause input ID duplication", func(t *testing.T) { + log := logp.NewLogger("test") + var countConfigure, countTest, countRun int + var runWG sync.WaitGroup + var ids = map[string]int{} + var idsMu sync.Mutex + + // setup + plugins := inputest.SinglePlugin("test", &inputest.MockInputManager{ + OnConfigure: func(cfg *conf.C) (v2.Input, error) { + idsMu.Lock() + defer idsMu.Unlock() + id, err := cfg.String("id", -1) + assert.NoError(t, err, "OnConfigure: could not get 'id' fom config") + idsCount := ids[id] + ids[id] = idsCount + 1 + + countConfigure++ + return &inputest.MockInput{ + OnTest: func(_ v2.TestContext) error { countTest++; return nil }, + OnRun: func(_ v2.Context, _ beat.PipelineConnector) error { + runWG.Done() + countRun++ + return nil + }, + }, nil + }, + }) + loader := inputest.MustNewTestLoader(t, plugins, "type", "test") + factory := RunnerFactory(log, beat.Info{}, loader.Loader) + + inputID := "filestream-kubernetes-pod-aee2af1c6365ecdd72416f44aab49cd8bdc7522ab008c39784b7fd9d46f794a4" + inputCfg := fmt.Sprintf(` +id: %s +parsers: + - container: null +paths: + - /var/log/containers/*aee2af1c6365ecdd72416f44aab49cd8bdc7522ab008c39784b7fd9d46f794a4.log +prospector: + scanner: + symlinks: true +take_over: true +type: test +`, inputID) + + runner, err := factory.Create(nil, conf.MustNewConfigFrom(inputCfg)) + require.NoError(t, err, "could not create input") + + runWG.Add(1) + runner.Start() + defer runner.Stop() + // wait input to be running + runWG.Wait() + + err = factory.CheckConfig(conf.MustNewConfigFrom(inputCfg)) + require.NoError(t, err, "unexpected error when calling CheckConfig") + + // validate: configured an input, but do not run test or run + assert.Equal(t, 2, countConfigure, "OnConfigure should be called only 2 times") + assert.Equal(t, 0, countTest, "OnTest should not have been called") + assert.Equal(t, 1, countRun, "OnRun should be called only once") + idsMu.Lock() + assert.Equal(t, 1, ids[inputID]) + idsMu.Unlock() + }) + t.Run("fail if input type is unknown to loader", func(t *testing.T) { log := logp.NewLogger("test") plugins := inputest.SinglePlugin("test", inputest.ConstInputManager(nil)) @@ -118,3 +186,44 @@ func TestRunnerFactory_CreateAndRun(t *testing.T) { assert.Error(t, err) }) } + +func TestGenerateCheckConfig(t *testing.T) { + tcs := []struct { + name string + cfg *conf.C + want *conf.C + wantErr error + assertCfg func(t assert.TestingT, expected interface{}, actual interface{}, msgAndArgs ...interface{}) bool + }{ + { + name: "id is present", + cfg: conf.MustNewConfigFrom("inputID: some-id"), + assertCfg: assert.NotEqual, + }, + { + name: "absent id", + cfg: conf.MustNewConfigFrom(""), + wantErr: errors.New("failed to get 'inputID'"), + assertCfg: assert.Equal, + }, + { + name: "invalid config", + cfg: nil, + wantErr: errors.New("failed to create new config"), + assertCfg: assert.Equal, + }, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + f := factory{} + + got, err := f.generateCheckConfig(tc.cfg) + if tc.wantErr != nil { + assert.ErrorContains(t, err, tc.wantErr.Error()) + } + + tc.assertCfg(t, tc.cfg, got) + }) + } +} From 5103cc121442b6adf944e24dd1ea66872b13c0f8 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Mon, 11 Nov 2024 16:49:31 +0100 Subject: [PATCH 2/6] fix changelog --- CHANGELOG.next.asciidoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 70b84a843d48..302af915bf06 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -48,7 +48,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Change log.file.path field in awscloudwatch input to nested object. {pull}41099[41099] - Remove deprecated awscloudwatch field from Filebeat. {pull}41089[41089] - The performance of ingesting SQS data with the S3 input has improved by up to 60x for queues with many small events. `max_number_of_messages` config for SQS mode is now ignored, as the new design no longer needs a manual cap on messages. Instead, use `number_of_workers` to scale ingestion rate in both S3 and SQS modes. The increased efficiency may increase network bandwidth consumption, which can be throttled by lowering `number_of_workers`. It may also increase number of events stored in memory, which can be throttled by lowering the configured size of the internal queue. {pull}40699[40699] -- Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover, +- Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585] - Add kafka compression support for ZSTD. From ab8f727ad79bd846d3d7249789681c0825674579 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Tue, 12 Nov 2024 09:01:20 +0100 Subject: [PATCH 3/6] fix refactor --- filebeat/input/v2/compat/compat.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/filebeat/input/v2/compat/compat.go b/filebeat/input/v2/compat/compat.go index 826c05c30596..48e12c77d83b 100644 --- a/filebeat/input/v2/compat/compat.go +++ b/filebeat/input/v2/compat/compat.go @@ -191,14 +191,14 @@ func (f *factory) generateCheckConfig(config *conf.C) (*conf.C, error) { return config, fmt.Errorf("failed to create new config: %w", err) } - // let's try to override the `inputID` field, if it fails, give up - inputID, err := testCfg.String("inputID", -1) + // let's try to override the `id` field, if it fails, give up + inputID, err := testCfg.String("id", -1) if err != nil { return config, fmt.Errorf("failed to get 'inputID': %w", err) } // using math/rand for performance, generate a 0-9 string - err = testCfg.SetString("inputID", -1, inputID+strconv.Itoa(rand.Intn(10))) + err = testCfg.SetString("id", -1, inputID+strconv.Itoa(rand.Intn(10))) if err != nil { return config, fmt.Errorf("failed to set 'inputID': %w", err) } From 67bcd91f46cc64d78925c2cf3684400549113fef Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Tue, 12 Nov 2024 11:56:45 +0100 Subject: [PATCH 4/6] fix refactor --- filebeat/input/v2/compat/compat_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/filebeat/input/v2/compat/compat_test.go b/filebeat/input/v2/compat/compat_test.go index 0c52383b7812..2f124a66625a 100644 --- a/filebeat/input/v2/compat/compat_test.go +++ b/filebeat/input/v2/compat/compat_test.go @@ -197,7 +197,7 @@ func TestGenerateCheckConfig(t *testing.T) { }{ { name: "id is present", - cfg: conf.MustNewConfigFrom("inputID: some-id"), + cfg: conf.MustNewConfigFrom("id: some-id"), assertCfg: assert.NotEqual, }, { From fd66a371fbad4db74fb0d27e217b4bcd85e027f2 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Tue, 12 Nov 2024 18:30:21 +0100 Subject: [PATCH 5/6] use math/rand/v2 --- filebeat/input/v2/compat/compat.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/filebeat/input/v2/compat/compat.go b/filebeat/input/v2/compat/compat.go index 48e12c77d83b..b56fcb7d6ea0 100644 --- a/filebeat/input/v2/compat/compat.go +++ b/filebeat/input/v2/compat/compat.go @@ -24,7 +24,7 @@ import ( "context" "errors" "fmt" - "math/rand" // using for better performance + "math/rand/v2" // using for better performance "strconv" "sync" @@ -198,7 +198,7 @@ func (f *factory) generateCheckConfig(config *conf.C) (*conf.C, error) { } // using math/rand for performance, generate a 0-9 string - err = testCfg.SetString("id", -1, inputID+strconv.Itoa(rand.Intn(10))) + err = testCfg.SetString("id", -1, inputID+strconv.Itoa(rand.IntN(10))) if err != nil { return config, fmt.Errorf("failed to set 'inputID': %w", err) } From de41ef4a5daa0abd776bd18f0c5aa2cc50f167e2 Mon Sep 17 00:00:00 2001 From: Anderson Queiroz Date: Thu, 14 Nov 2024 09:07:05 +0100 Subject: [PATCH 6/6] generateCheckConfig returns nil on error and uses uuid --- filebeat/input/v2/compat/compat.go | 18 +++++++++++------- filebeat/input/v2/compat/compat_test.go | 20 ++++++++++++-------- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/filebeat/input/v2/compat/compat.go b/filebeat/input/v2/compat/compat.go index b56fcb7d6ea0..fde3f2792331 100644 --- a/filebeat/input/v2/compat/compat.go +++ b/filebeat/input/v2/compat/compat.go @@ -24,10 +24,9 @@ import ( "context" "errors" "fmt" - "math/rand/v2" // using for better performance - "strconv" "sync" + "github.com/gofrs/uuid/v5" "github.com/mitchellh/hashstructure" v2 "github.com/elastic/beats/v7/filebeat/input/v2" @@ -80,6 +79,7 @@ func (f *factory) CheckConfig(cfg *conf.C) error { checkCfg, err := f.generateCheckConfig(cfg) if err != nil { f.log.Warnw(fmt.Sprintf("input V2 factory.CheckConfig failed to clone config before checking it. Original config will be checked, it might trigger an input duplication warning: %v", err), "original_config", conf.DebugString(cfg, true)) + checkCfg = cfg } _, err = f.loader.Configure(checkCfg) if err != nil { @@ -186,21 +186,25 @@ func configID(config *conf.C) (string, error) { } func (f *factory) generateCheckConfig(config *conf.C) (*conf.C, error) { + // copy the config so it's safe to change it testCfg, err := conf.NewConfigFrom(config) if err != nil { - return config, fmt.Errorf("failed to create new config: %w", err) + return nil, fmt.Errorf("failed to create new config: %w", err) } // let's try to override the `id` field, if it fails, give up inputID, err := testCfg.String("id", -1) if err != nil { - return config, fmt.Errorf("failed to get 'inputID': %w", err) + return nil, fmt.Errorf("failed to get 'id': %w", err) } - // using math/rand for performance, generate a 0-9 string - err = testCfg.SetString("id", -1, inputID+strconv.Itoa(rand.IntN(10))) + id, err := uuid.NewV4() if err != nil { - return config, fmt.Errorf("failed to set 'inputID': %w", err) + return nil, fmt.Errorf("failed to generate check congig id: %w", err) + } + err = testCfg.SetString("id", -1, inputID+"-"+id.String()) + if err != nil { + return nil, fmt.Errorf("failed to set 'id': %w", err) } return testCfg, nil diff --git a/filebeat/input/v2/compat/compat_test.go b/filebeat/input/v2/compat/compat_test.go index 2f124a66625a..554701cdae84 100644 --- a/filebeat/input/v2/compat/compat_test.go +++ b/filebeat/input/v2/compat/compat_test.go @@ -201,16 +201,20 @@ func TestGenerateCheckConfig(t *testing.T) { assertCfg: assert.NotEqual, }, { - name: "absent id", - cfg: conf.MustNewConfigFrom(""), - wantErr: errors.New("failed to get 'inputID'"), - assertCfg: assert.Equal, + name: "absent id", + cfg: conf.MustNewConfigFrom(""), + wantErr: errors.New("failed to get 'id'"), + assertCfg: func(t assert.TestingT, _ interface{}, got interface{}, msgAndArgs ...interface{}) bool { + return assert.Nil(t, got, msgAndArgs...) + }, }, { - name: "invalid config", - cfg: nil, - wantErr: errors.New("failed to create new config"), - assertCfg: assert.Equal, + name: "invalid config", + cfg: nil, + wantErr: errors.New("failed to create new config"), + assertCfg: func(t assert.TestingT, _ interface{}, got interface{}, msgAndArgs ...interface{}) bool { + return assert.Nil(t, got, msgAndArgs...) + }, }, }