Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

filebeat: input v2 compat uses random ID for CheckConfig #41585

Merged
merged 10 commits into from
Nov 14, 2024
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Change log.file.path field in awscloudwatch input to nested object. {pull}41099[41099]
- Remove deprecated awscloudwatch field from Filebeat. {pull}41089[41089]
- The performance of ingesting SQS data with the S3 input has improved by up to 60x for queues with many small events. `max_number_of_messages` config for SQS mode is now ignored, as the new design no longer needs a manual cap on messages. Instead, use `number_of_workers` to scale ingestion rate in both S3 and SQS modes. The increased efficiency may increase network bandwidth consumption, which can be throttled by lowering `number_of_workers`. It may also increase number of events stored in memory, which can be throttled by lowering the configured size of the internal queue. {pull}40699[40699]
- Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585]

- Add kafka compression support for ZSTD.

Expand Down
37 changes: 35 additions & 2 deletions filebeat/input/v2/compat/compat.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"fmt"
"sync"

"github.com/gofrs/uuid/v5"
"github.com/mitchellh/hashstructure"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
Expand Down Expand Up @@ -73,12 +74,19 @@ func RunnerFactory(
}

func (f *factory) CheckConfig(cfg *conf.C) error {
_, err := f.loader.Configure(cfg)
// just check the config, therefore to avoid potential side effects (ID duplication)
// change the ID.
checkCfg, err := f.generateCheckConfig(cfg)
if err != nil {
f.log.Warnw(fmt.Sprintf("input V2 factory.CheckConfig failed to clone config before checking it. Original config will be checked, it might trigger an input duplication warning: %v", err), "original_config", conf.DebugString(cfg, true))
checkCfg = cfg
}
rdner marked this conversation as resolved.
Show resolved Hide resolved
_, err = f.loader.Configure(checkCfg)
rdner marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return fmt.Errorf("runner factory could not check config: %w", err)
}

if err = f.loader.Delete(cfg); err != nil {
if err = f.loader.Delete(checkCfg); err != nil {
return fmt.Errorf(
"runner factory failed to delete an input after config check: %w",
err)
Expand Down Expand Up @@ -176,3 +184,28 @@ func configID(config *conf.C) (string, error) {

return fmt.Sprintf("%16X", id), nil
}

func (f *factory) generateCheckConfig(config *conf.C) (*conf.C, error) {
// copy the config so it's safe to change it
testCfg, err := conf.NewConfigFrom(config)
rdner marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
return nil, fmt.Errorf("failed to create new config: %w", err)
}

// let's try to override the `id` field, if it fails, give up
inputID, err := testCfg.String("id", -1)
if err != nil {
return nil, fmt.Errorf("failed to get 'id': %w", err)
}

id, err := uuid.NewV4()
if err != nil {
return nil, fmt.Errorf("failed to generate check congig id: %w", err)
}
err = testCfg.SetString("id", -1, inputID+"-"+id.String())
if err != nil {
return nil, fmt.Errorf("failed to set 'id': %w", err)
}

return testCfg, nil
}
113 changes: 113 additions & 0 deletions filebeat/input/v2/compat/compat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
package compat

import (
"errors"
"fmt"
"sync"
"testing"

Expand Down Expand Up @@ -62,6 +64,72 @@ func TestRunnerFactory_CheckConfig(t *testing.T) {
assert.Equal(t, 0, countRun)
})

t.Run("does not cause input ID duplication", func(t *testing.T) {
log := logp.NewLogger("test")
var countConfigure, countTest, countRun int
var runWG sync.WaitGroup
var ids = map[string]int{}
var idsMu sync.Mutex

// setup
plugins := inputest.SinglePlugin("test", &inputest.MockInputManager{
OnConfigure: func(cfg *conf.C) (v2.Input, error) {
idsMu.Lock()
defer idsMu.Unlock()
id, err := cfg.String("id", -1)
assert.NoError(t, err, "OnConfigure: could not get 'id' fom config")
idsCount := ids[id]
ids[id] = idsCount + 1

countConfigure++
return &inputest.MockInput{
OnTest: func(_ v2.TestContext) error { countTest++; return nil },
OnRun: func(_ v2.Context, _ beat.PipelineConnector) error {
runWG.Done()
countRun++
return nil
},
}, nil
},
})
loader := inputest.MustNewTestLoader(t, plugins, "type", "test")
factory := RunnerFactory(log, beat.Info{}, loader.Loader)

inputID := "filestream-kubernetes-pod-aee2af1c6365ecdd72416f44aab49cd8bdc7522ab008c39784b7fd9d46f794a4"
inputCfg := fmt.Sprintf(`
id: %s
parsers:
- container: null
paths:
- /var/log/containers/*aee2af1c6365ecdd72416f44aab49cd8bdc7522ab008c39784b7fd9d46f794a4.log
prospector:
scanner:
symlinks: true
take_over: true
type: test
`, inputID)

runner, err := factory.Create(nil, conf.MustNewConfigFrom(inputCfg))
require.NoError(t, err, "could not create input")

runWG.Add(1)
runner.Start()
defer runner.Stop()
// wait input to be running
runWG.Wait()

err = factory.CheckConfig(conf.MustNewConfigFrom(inputCfg))
require.NoError(t, err, "unexpected error when calling CheckConfig")

// validate: configured an input, but do not run test or run
assert.Equal(t, 2, countConfigure, "OnConfigure should be called only 2 times")
assert.Equal(t, 0, countTest, "OnTest should not have been called")
assert.Equal(t, 1, countRun, "OnRun should be called only once")
idsMu.Lock()
assert.Equal(t, 1, ids[inputID])
idsMu.Unlock()
})

t.Run("fail if input type is unknown to loader", func(t *testing.T) {
log := logp.NewLogger("test")
plugins := inputest.SinglePlugin("test", inputest.ConstInputManager(nil))
Expand Down Expand Up @@ -118,3 +186,48 @@ func TestRunnerFactory_CreateAndRun(t *testing.T) {
assert.Error(t, err)
})
}

func TestGenerateCheckConfig(t *testing.T) {
tcs := []struct {
name string
cfg *conf.C
want *conf.C
wantErr error
assertCfg func(t assert.TestingT, expected interface{}, actual interface{}, msgAndArgs ...interface{}) bool
}{
{
name: "id is present",
cfg: conf.MustNewConfigFrom("id: some-id"),
assertCfg: assert.NotEqual,
},
{
name: "absent id",
cfg: conf.MustNewConfigFrom(""),
wantErr: errors.New("failed to get 'id'"),
assertCfg: func(t assert.TestingT, _ interface{}, got interface{}, msgAndArgs ...interface{}) bool {
return assert.Nil(t, got, msgAndArgs...)
},
},
{
name: "invalid config",
cfg: nil,
wantErr: errors.New("failed to create new config"),
assertCfg: func(t assert.TestingT, _ interface{}, got interface{}, msgAndArgs ...interface{}) bool {
return assert.Nil(t, got, msgAndArgs...)
},
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
f := factory{}

got, err := f.generateCheckConfig(tc.cfg)
if tc.wantErr != nil {
assert.ErrorContains(t, err, tc.wantErr.Error())
}

tc.assertCfg(t, tc.cfg, got)
})
}
}
Loading