diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index b2f116bb200..f21fd1c5640 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -53,7 +53,10 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585] - Add kafka compression support for ZSTD. - Filebeat fails to start if there is any input with a duplicated ID. It logs the duplicated IDs and the offending inputs configurations. {pull}41731[41731] +- Filestream inputs with duplicated IDs will fail to start. An error is logged showing the ID and the full input configuration. {issue}41938[41938] {pull}41954[41954] +- Filestream inputs can define `allow_deprecated_id_duplication: true` to run keep the previous behaviour of running inputs with duplicated IDs. {issue}41938[41938] {pull}41954[41954] - The Filestream input only starts to ingest a file when it is >= 1024 bytes in size. This happens because the fingerprint` is the default file identity now. To restore the previous behaviour, set `file_identity.native: ~` and `prospector.scanner.fingerprint.enabled: false` {issue}40197[40197] {pull}41762[41762] + *Heartbeat* @@ -200,6 +203,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix awss3 document ID construction when using the CSV decoder. {pull}42019[42019] - The `_id` generation process for S3 events has been updated to incorporate the LastModified field. This enhancement ensures that the `_id` is unique. {pull}42078[42078] - Fix Netflow Template Sharing configuration handling. {pull}42080[42080] +- Updated websocket retry error code list to allow more scenarios to be retried which could have been missed previously. {pull}42218[42218] *Heartbeat* @@ -233,6 +237,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Do not report non-existant 0 values for RSS metrics in docker/memory {pull}41449[41449] - Log Cisco Meraki `getDevicePerformanceScores` errors without stopping metrics collection. {pull}41622[41622] - Don't skip first bucket value in GCP metrics metricset for distribution type metrics {pull}41822[41822] +- [K8s Integration] Enhance HTTP authentication in case of token updates for Apiserver, Controllermanager and Scheduler metricsets {issue}41910[41910] {pull}42016[42016] - Fixed `creation_date` scientific notation output in the `elasticsearch.index` metricset. {pull}42053[42053] - Fix bug where metricbeat unintentionally triggers Windows ASR. {pull}42177[42177] @@ -379,6 +384,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Added default values in the streaming input for websocket retries and put a cap on retry wait time to be lesser than equal to the maximum defined wait time. {pull}42012[42012] - Rate limiting fault tolerance improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}42094[42094] - Added OAuth2 support with auto token refresh for websocket streaming input. {issue}41989[41989] {pull}42212[42212] +- Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225] *Auditbeat* diff --git a/filebeat/beater/crawler.go b/filebeat/beater/crawler.go index ecb7a8d1bbf..f0f5ba20b63 100644 --- a/filebeat/beater/crawler.go +++ b/filebeat/beater/crawler.go @@ -79,14 +79,14 @@ func (c *crawler) Start( } if configInputs.Enabled() { - c.inputReloader = cfgfile.NewReloader(pipeline, configInputs) + c.inputReloader = cfgfile.NewReloader(log.Named("input.reloader"), pipeline, configInputs) if err := c.inputReloader.Check(c.inputsFactory); err != nil { return fmt.Errorf("creating input reloader failed: %w", err) } } if configModules.Enabled() { - c.modulesReloader = cfgfile.NewReloader(pipeline, configModules) + c.modulesReloader = cfgfile.NewReloader(log.Named("module.reloader"), pipeline, configModules) if err := c.modulesReloader.Check(c.modulesFactory); err != nil { return fmt.Errorf("creating module reloader failed: %w", err) } diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 4909941b90a..14e3ad79f55 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -224,7 +224,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error { newPath := strings.TrimSuffix(origPath, ".yml") _ = fb.config.ConfigModules.SetString("path", -1, newPath) } - modulesLoader := cfgfile.NewReloader(fb.pipeline, fb.config.ConfigModules) + modulesLoader := cfgfile.NewReloader(logp.L().Named("module.reloader"), fb.pipeline, fb.config.ConfigModules) modulesLoader.Load(modulesFactory) } diff --git a/filebeat/docs/inputs/input-filestream-file-options.asciidoc b/filebeat/docs/inputs/input-filestream-file-options.asciidoc index b87d9e67af6..f4fb4da3096 100644 --- a/filebeat/docs/inputs/input-filestream-file-options.asciidoc +++ b/filebeat/docs/inputs/input-filestream-file-options.asciidoc @@ -19,12 +19,27 @@ supported. ===== `id` A unique identifier for this filestream input. Each filestream input -must have a unique ID. +must have a unique ID. Filestream will not start inputs with duplicated IDs. WARNING: Changing input ID may cause data duplication because the state of the files will be lost and they will be read from the beginning again. +[float] +[[filestream-input-allow_deprecated_id_duplication]] +===== `allow_deprecated_id_duplication` + +This allows {beatname_uc} to run multiple instances of the {type} +input with the same ID. This is intended to add backwards +compatibility with the behaviour prior to 9.0. It defaults to `false` +and is **not recommended** in new configurations. + +This setting is per input, so make sure to enable it in all {type} +inputs that use duplicated IDs. + +WARNING: Duplicated IDs will lead to data duplication and some input +instances will not produce any metrics. + [float] [[filestream-input-paths]] ===== `paths` diff --git a/filebeat/input/filestream/config.go b/filebeat/input/filestream/config.go index 2860dd673c2..ab994fb0b52 100644 --- a/filebeat/input/filestream/config.go +++ b/filebeat/input/filestream/config.go @@ -49,6 +49,10 @@ type config struct { IgnoreInactive ignoreInactiveType `config:"ignore_inactive"` Rotation *conf.Namespace `config:"rotation"` TakeOver bool `config:"take_over"` + + // AllowIDDuplication is used by InputManager.Create + // (see internal/input-logfile/manager.go). + AllowIDDuplication bool `config:"allow_deprecated_id_duplication"` } type closerConfig struct { @@ -142,6 +146,13 @@ func (c *config) Validate() error { return fmt.Errorf("no path is configured") } + if c.AllowIDDuplication { + logp.L().Named("input.filestream").Warn( + "setting `allow_deprecated_id_duplication` will lead to data " + + "duplication and incomplete input metrics, it's use is " + + "highly discouraged.") + } + return nil } diff --git a/filebeat/input/filestream/input_integration_test.go b/filebeat/input/filestream/input_integration_test.go index 5c063481dd5..e8e99213da5 100644 --- a/filebeat/input/filestream/input_integration_test.go +++ b/filebeat/input/filestream/input_integration_test.go @@ -101,6 +101,23 @@ func TestFilestreamMetadataUpdatedOnRename(t *testing.T) { "prospector.scanner.check_interval": "1ms", "prospector.scanner.fingerprint.enabled": false, "file_identity.native": map[string]any{}, + // For some reason this test became flaky, the root of the flakiness + // is not on the test, it is on how a rename operation is detected. + // Even though this test uses `os.Rename`, it does not seem to be an atomic + // operation. https://www.man7.org/linux/man-pages/man2/rename.2.html + // does not make it clear whether 'renameat' (used by `os.Rename`) is + // atomic. + // + // On a flaky execution, the file is actually perceived as removed + // and then a new file is created, both with the same inode. This + // happens on a system that does not reuse inodes as soon they're + // freed. Because the file is detected as removed, it's state is also + // removed. Then when more data is added, only the offset of the new + // data is tracked by the registry, causing the test to fail. + // + // A workaround for this is to not remove the state when the file is + // removed, hence `clean_removed: false` is set here. + "clean_removed": false, }) testline := []byte("log line\n") diff --git a/filebeat/input/filestream/internal/input-logfile/manager.go b/filebeat/input/filestream/internal/input-logfile/manager.go index c65ccb5e308..6c7d37a2c66 100644 --- a/filebeat/input/filestream/internal/input-logfile/manager.go +++ b/filebeat/input/filestream/internal/input-logfile/manager.go @@ -25,6 +25,7 @@ import ( "sync" "time" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/go-concert/unison" v2 "github.com/elastic/beats/v7/filebeat/input/v2" @@ -155,26 +156,54 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) { } settings := struct { - ID string `config:"id"` - CleanInactive time.Duration `config:"clean_inactive"` - HarvesterLimit uint64 `config:"harvester_limit"` + ID string `config:"id"` + CleanInactive time.Duration `config:"clean_inactive"` + HarvesterLimit uint64 `config:"harvester_limit"` + AllowIDDuplication bool `config:"allow_deprecated_id_duplication"` }{CleanInactive: cim.DefaultCleanTimeout} if err := config.Unpack(&settings); err != nil { return nil, err } if settings.ID == "" { - cim.Logger.Error("filestream input ID without ID might lead to data" + - " duplication, please add an ID and restart Filebeat") + cim.Logger.Warn("filestream input without ID is discouraged, please add an ID and restart Filebeat") } metricsID := settings.ID cim.idsMux.Lock() if _, exists := cim.ids[settings.ID]; exists { - cim.Logger.Errorf("filestream input with ID '%s' already exists, this "+ - "will lead to data duplication, please use a different ID. Metrics "+ - "collection has been disabled on this input.", settings.ID) - metricsID = "" + duplicatedInput := map[string]any{} + unpackErr := config.Unpack(&duplicatedInput) + if unpackErr != nil { + duplicatedInput["error"] = fmt.Errorf("failed to unpack duplicated input config: %w", unpackErr).Error() + } + + // Keep old behaviour so users can upgrade to 9.0 without + // having their inputs not starting. + if settings.AllowIDDuplication { + cim.Logger.Errorf("filestream input with ID '%s' already exists, "+ + "this will lead to data duplication, please use a different "+ + "ID. Metrics collection has been disabled on this input. The "+ + " input will start only because "+ + "'allow_deprecated_id_duplication' is set to true", + settings.ID) + metricsID = "" + } else { + cim.Logger.Errorw( + fmt.Sprintf( + "filestream input ID '%s' is duplicated: input will NOT start", + settings.ID, + ), + "input.cfg", conf.DebugString(config, true)) + + cim.idsMux.Unlock() + return nil, &common.ErrNonReloadable{ + Err: fmt.Errorf( + "filestream input with ID '%s' already exists, this "+ + "will lead to data duplication, please use a different ID", + settings.ID, + )} + } } // TODO: improve how inputs with empty IDs are tracked. diff --git a/filebeat/input/filestream/internal/input-logfile/manager_test.go b/filebeat/input/filestream/internal/input-logfile/manager_test.go index f13f51772f8..19c2eead9a9 100644 --- a/filebeat/input/filestream/internal/input-logfile/manager_test.go +++ b/filebeat/input/filestream/internal/input-logfile/manager_test.go @@ -19,6 +19,8 @@ package input_logfile import ( "bytes" + "fmt" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -26,6 +28,7 @@ import ( "go.uber.org/zap" "go.uber.org/zap/zapcore" + v2 "github.com/elastic/beats/v7/filebeat/input/v2" "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/libbeat/statestore/storetest" "github.com/elastic/elastic-agent-libs/config" @@ -42,6 +45,18 @@ func (s *testSource) Name() string { return s.name } +type noopProspector struct{} + +func (m noopProspector) Init(_, _ ProspectorCleaner, _ func(Source) string) error { + return nil +} + +func (m noopProspector) Run(_ v2.Context, _ StateMetadataUpdater, _ HarvesterGroup) {} + +func (m noopProspector) Test() error { + return nil +} + func TestSourceIdentifier_ID(t *testing.T) { testCases := map[string]struct { userID string @@ -198,6 +213,115 @@ func TestInputManager_Create(t *testing.T) { assert.NotContains(t, buff.String(), "already exists") }) + + t.Run("does not start an input with duplicated ID", func(t *testing.T) { + tcs := []struct { + name string + id string + }{ + {name: "ID is empty", id: ""}, + {name: "non-empty ID", id: "non-empty-ID"}, + } + + for _, tc := range tcs { + t.Run(tc.name, func(t *testing.T) { + storeReg := statestore.NewRegistry(storetest.NewMemoryStoreBackend()) + testStore, err := storeReg.Get("test") + require.NoError(t, err) + + log, buff := newBufferLogger() + + cim := &InputManager{ + Logger: log, + StateStore: testStateStore{Store: testStore}, + Configure: func(_ *config.C) (Prospector, Harvester, error) { + var wg sync.WaitGroup + + return &noopProspector{}, &mockHarvester{onRun: correctOnRun, wg: &wg}, nil + }} + cfg1 := config.MustNewConfigFrom(fmt.Sprintf(` +type: filestream +id: %s +paths: + - /var/log/foo +`, tc.id)) + + // Create a different 2nd config with duplicated ID to ensure + // the ID itself is the only requirement to prevent the 2nd input + // from being created. + cfg2 := config.MustNewConfigFrom(fmt.Sprintf(` +type: filestream +id: %s +paths: + - /var/log/bar +`, tc.id)) + + _, err = cim.Create(cfg1) + require.NoError(t, err, "1st input should have been created") + + // Attempt to create an input with a duplicated ID + _, err = cim.Create(cfg2) + require.Error(t, err, "filestream should not have created an input with a duplicated ID") + + logs := buff.String() + // Assert the logs contain the correct log message + assert.Contains(t, logs, + fmt.Sprintf("filestream input ID '%s' is duplicated:", tc.id)) + + // Assert the error contains the correct text + assert.Contains(t, err.Error(), + fmt.Sprintf("filestream input with ID '%s' already exists", tc.id)) + }) + } + }) + + t.Run("allow duplicated IDs setting", func(t *testing.T) { + storeReg := statestore.NewRegistry(storetest.NewMemoryStoreBackend()) + testStore, err := storeReg.Get("test") + require.NoError(t, err) + + log, buff := newBufferLogger() + + cim := &InputManager{ + Logger: log, + StateStore: testStateStore{Store: testStore}, + Configure: func(_ *config.C) (Prospector, Harvester, error) { + var wg sync.WaitGroup + + return &noopProspector{}, &mockHarvester{onRun: correctOnRun, wg: &wg}, nil + }} + cfg1 := config.MustNewConfigFrom(` +type: filestream +id: duplicated-id +allow_deprecated_id_duplication: true +paths: + - /var/log/foo +`) + + // Create a different 2nd config with duplicated ID to ensure + // the ID itself is the only requirement to prevent the 2nd input + // from being created. + cfg2 := config.MustNewConfigFrom(` +type: filestream +id: duplicated-id +allow_deprecated_id_duplication: true +paths: + - /var/log/bar +`) + _, err = cim.Create(cfg1) + require.NoError(t, err, "1st input should have been created") + // Create an input with a duplicated ID + _, err = cim.Create(cfg2) + require.NoError(t, err, "filestream should not have created an input with a duplicated ID") + + logs := buff.String() + // Assert the logs contain the correct log message + assert.Contains(t, logs, + "filestream input with ID 'duplicated-id' already exists, this "+ + "will lead to data duplication, please use a different ID. Metrics "+ + "collection has been disabled on this input.", + "did not find the expected message about the duplicated input ID") + }) } func newBufferLogger() (*logp.Logger, *bytes.Buffer) { diff --git a/filebeat/tests/integration/filestream_test.go b/filebeat/tests/integration/filestream_test.go index 24125469dd8..bc0b96b4377 100644 --- a/filebeat/tests/integration/filestream_test.go +++ b/filebeat/tests/integration/filestream_test.go @@ -274,7 +274,7 @@ logging: filebeat.WaitForLogs( "Input 'filestream' starting", 10*time.Second, - "Filebeat did log a validation error") + "Filebeat did not log a validation error") } func TestFilestreamCanMigrateIdentity(t *testing.T) { diff --git a/heartbeat/beater/heartbeat.go b/heartbeat/beater/heartbeat.go index 227b375ee90..bb6f73b2fde 100644 --- a/heartbeat/beater/heartbeat.go +++ b/heartbeat/beater/heartbeat.go @@ -184,7 +184,7 @@ func (bt *Heartbeat) Run(b *beat.Beat) error { } if bt.config.ConfigMonitors.Enabled() { - bt.monitorReloader = cfgfile.NewReloader(b.Publisher, bt.config.ConfigMonitors) + bt.monitorReloader = cfgfile.NewReloader(logp.L().Named("module.reload"), b.Publisher, bt.config.ConfigMonitors) defer bt.monitorReloader.Stop() err := bt.RunReloadableMonitors() diff --git a/libbeat/autodiscover/autodiscover.go b/libbeat/autodiscover/autodiscover.go index 36408051952..27739c2e561 100644 --- a/libbeat/autodiscover/autodiscover.go +++ b/libbeat/autodiscover/autodiscover.go @@ -24,6 +24,7 @@ import ( "github.com/elastic/beats/v7/libbeat/autodiscover/meta" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/elastic-agent-autodiscover/bus" conf "github.com/elastic/elastic-agent-libs/config" @@ -169,7 +170,10 @@ func (a *Autodiscover) worker() { updated = false // On error, make sure the next run also updates because some runners were not properly loaded - retry = err != nil + retry = common.IsInputReloadable(err) + if err != nil && !retry { + a.logger.Errorw("all new inputs failed to start with a non-retriable error", err) + } if retry { // The recoverable errors that can lead to retry are related // to the harvester state, so we need to give the publishing diff --git a/libbeat/autodiscover/autodiscover_test.go b/libbeat/autodiscover/autodiscover_test.go index 5343c093941..40c15d7de66 100644 --- a/libbeat/autodiscover/autodiscover_test.go +++ b/libbeat/autodiscover/autodiscover_test.go @@ -18,8 +18,11 @@ package autodiscover import ( + "bytes" "encoding/json" + "errors" "fmt" + "path/filepath" "reflect" "strings" "sync" @@ -29,9 +32,12 @@ import ( "github.com/gofrs/uuid/v5" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/cfgfile" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/tests/resources" "github.com/elastic/elastic-agent-autodiscover/bus" conf "github.com/elastic/elastic-agent-libs/config" @@ -116,10 +122,22 @@ func (m *mockAdapter) CheckConfig(c *conf.C) error { return nil } +// Create returns a mockRunner with the provided config. If +// the config contains `err_non_reloadable: true`, then a +// common.ErrNonReloadable is returned alongside a nil runner. func (m *mockAdapter) Create(_ beat.PipelineConnector, config *conf.C) (cfgfile.Runner, error) { + // On error false is returned, that's enough to keep a correct behaviour + nonReloadable, _ := config.Bool("err_non_reloadable", -1) + if nonReloadable { + return nil, common.ErrNonReloadable{ + Err: errors.New("a non reloadable error"), + } + } + runner := &mockRunner{ config: config, } + m.mutex.Lock() defer m.mutex.Unlock() m.runners = append(m.runners, runner) @@ -800,3 +818,107 @@ func check(t *testing.T, runners []*mockRunner, expected *conf.C, started, stopp } t.Fatalf("expected cfg %v to be started=%v stopped=%v but have %v", out, started, stopped, runners) } + +func TestErrNonReloadableIsNotRetried(t *testing.T) { + // Register mock autodiscover provider + busChan := make(chan bus.Bus, 1) + Registry = NewRegistry() + err := Registry.AddProvider( + "mock", + func(beatName string, + b bus.Bus, + uuid uuid.UUID, + c *conf.C, + k keystore.Keystore) (Provider, error) { + + // intercept bus to mock events + busChan <- b + + return &mockProvider{}, nil + }) + if err != nil { + t.Fatalf("cannot add provider to registry: %s", err) + } + + // Create a mock adapter, 'err_non_reloadable' will make its Create method + // to return a common.ErrNonReloadable. + adapter := mockAdapter{ + configs: []*conf.C{ + conf.MustNewConfigFrom(map[string]any{ + "err_non_reloadable": true, + }), + }, + } + + // and settings: + providerConfig, _ := conf.NewConfigFrom(map[string]string{ + "type": "mock", + }) + config := Config{ + Providers: []*conf.C{providerConfig}, + } + k, _ := keystore.NewFileKeystore(filepath.Join(t.TempDir(), "keystore")) + // Create autodiscover manager + autodiscover, err := NewAutodiscover("test", nil, &adapter, &adapter, &config, k) + if err != nil { + t.Fatal(err) + } + + logger, logsBuffer := newBufferLogger() + autodiscover.logger = logger + // set the debounce period to something small in order to + // speed up the tests. This seems to be the sweet stop + // for the fastest test run + autodiscover.debouncePeriod = time.Millisecond + + // Start it + autodiscover.Start() + defer autodiscover.Stop() + eventBus := <-busChan + + // Send an event to the bus, the event itself is not important + // because the mockAdapter will return the same configs regardless + // of the event + eventBus.Publish(bus.Event{ + // That's used in the last assertion, the config key is + // : + "id": "foo", + "provider": "mock", + "start": true, + "meta": mapstr.M{ + "test_name": t.Name(), + }, + }) + + // Ensure we logged the error about not retrying reloading input + require.Eventually( + t, + func() bool { + return strings.Contains( + logsBuffer.String(), + `all new inputs failed to start with a non-retriable error","error":"Error creating runner from config: ErrNonReloadable: a non reloadable error`, + ) + }, + time.Second*10, + time.Millisecond*10, + "foo error") + + // Ensure nothing is running + requireRunningRunners(t, autodiscover, 0) + runners := adapter.Runners() + require.Equal(t, len(runners), 0) + + // Ensure the autodiscover got the config + require.Equal(t, len(autodiscover.configs["mock:foo"]), 1) +} + +func newBufferLogger() (*logp.Logger, *bytes.Buffer) { + buf := &bytes.Buffer{} + encoderConfig := zap.NewProductionEncoderConfig() + encoder := zapcore.NewJSONEncoder(encoderConfig) + writeSyncer := zapcore.AddSync(buf) + log := logp.NewLogger("", zap.WrapCore(func(_ zapcore.Core) zapcore.Core { + return zapcore.NewCore(encoder, writeSyncer, zapcore.DebugLevel) + })) + return log, buf +} diff --git a/libbeat/cfgfile/list.go b/libbeat/cfgfile/list.go index 3a9e3429d29..3e076ee4c14 100644 --- a/libbeat/cfgfile/list.go +++ b/libbeat/cfgfile/list.go @@ -22,7 +22,6 @@ import ( "fmt" "sync" - "github.com/joeshaw/multierror" "github.com/mitchellh/hashstructure" "github.com/elastic/beats/v7/libbeat/beat" @@ -71,8 +70,7 @@ func (r *RunnerList) Runners() []Runner { // // Runners might fail to start, it's the callers responsibility to // handle any error. During execution, any encountered errors are -// accumulated in a `multierror.Errors` and returned as -// a `multierror.MultiError` upon completion. +// accumulated in a []errors and returned as errors.Join(errs) upon completion. // // While the stopping of runners occurs on separate goroutines, // Reload will wait for all runners to finish before starting any new runners. @@ -85,7 +83,7 @@ func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error { r.mutex.Lock() defer r.mutex.Unlock() - var errs multierror.Errors + var errs []error startList := map[uint64]*reload.ConfigWithMeta{} stopList := r.copyRunnerList() @@ -179,7 +177,7 @@ func (r *RunnerList) Reload(configs []*reload.ConfigWithMeta) error { // above it is done asynchronously. moduleRunning.Set(int64(len(r.runners))) - return errs.Err() + return errors.Join(errs...) } // Stop all runners diff --git a/libbeat/cfgfile/reload.go b/libbeat/cfgfile/reload.go index 930bd56eafd..2d802f1b30d 100644 --- a/libbeat/cfgfile/reload.go +++ b/libbeat/cfgfile/reload.go @@ -26,6 +26,7 @@ import ( "github.com/joeshaw/multierror" "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" @@ -42,8 +43,6 @@ var ( }, } - debugf = logp.MakeDebug("cfgfile") - // configScans measures how many times the config dir was scanned for // changes, configReloads measures how many times there were changes that // triggered an actual reload. @@ -101,10 +100,11 @@ type Reloader struct { path string done chan struct{} wg sync.WaitGroup + logger *logp.Logger } // NewReloader creates new Reloader instance for the given config -func NewReloader(pipeline beat.PipelineConnector, cfg *config.C) *Reloader { +func NewReloader(logger *logp.Logger, pipeline beat.PipelineConnector, cfg *config.C) *Reloader { conf := DefaultDynamicConfig _ = cfg.Unpack(&conf) @@ -118,6 +118,7 @@ func NewReloader(pipeline beat.PipelineConnector, cfg *config.C) *Reloader { config: conf, path: path, done: make(chan struct{}), + logger: logger, } } @@ -128,7 +129,7 @@ func (rl *Reloader) Check(runnerFactory RunnerFactory) error { return nil } - debugf("Checking module configs from: %s", rl.path) + rl.logger.Debugf("Checking module configs from: %s", rl.path) gw := NewGlobWatcher(rl.path) files, _, err := gw.Scan() @@ -142,7 +143,7 @@ func (rl *Reloader) Check(runnerFactory RunnerFactory) error { return fmt.Errorf("loading configs: %w", err) } - debugf("Number of module configs found: %v", len(configs)) + rl.logger.Debugf("Number of module configs found: %v", len(configs)) // Initialize modules for _, c := range configs { @@ -190,7 +191,7 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) { return case <-time.After(rl.config.Reload.Period): - debugf("Scan for new config files") + rl.logger.Debug("Scan for new config files") configScans.Add(1) files, updated, err := gw.Scan() @@ -209,13 +210,19 @@ func (rl *Reloader) Run(runnerFactory RunnerFactory) { // Load all config objects configs, _ := rl.loadConfigs(files) - debugf("Number of module configs found: %v", len(configs)) + rl.logger.Debugf("Number of module configs found: %v", len(configs)) err = list.Reload(configs) - // Force reload on the next iteration if and only if this one failed. - // (Any errors are already logged by list.Reload, so we don't need to - // propagate the details further.) - forceReload = err != nil + // Force reload on the next iteration if and only if the error + // can be retried. + // Errors are already logged by list.Reload, so we don't need to + // propagate details any further. + forceReload = common.IsInputReloadable(err) + if forceReload { + rl.logger.Debugf("error '%v' can be retried. Will try again in %s", err, rl.config.Reload.Period.String()) + } else { + rl.logger.Debugf("error '%v' cannot retried. Modify any input file to reload.", err) + } } // Path loading is enabled but not reloading. Loads files only once and then stops. @@ -240,7 +247,7 @@ func (rl *Reloader) Load(runnerFactory RunnerFactory) { gw := NewGlobWatcher(rl.path) - debugf("Scan for config files") + rl.logger.Debug("Scan for config files") files, _, err := gw.Scan() if err != nil { logp.Err("Error fetching new config files: %v", err) @@ -249,7 +256,7 @@ func (rl *Reloader) Load(runnerFactory RunnerFactory) { // Load all config objects configs, _ := rl.loadConfigs(files) - debugf("Number of module configs found: %v", len(configs)) + rl.logger.Debugf("Number of module configs found: %v", len(configs)) if err := list.Reload(configs); err != nil { logp.Err("Error loading configuration files: %+v", err) diff --git a/libbeat/cfgfile/reload_test.go b/libbeat/cfgfile/reload_integration_test.go similarity index 82% rename from libbeat/cfgfile/reload_test.go rename to libbeat/cfgfile/reload_integration_test.go index f28fbd7033a..e5102764b68 100644 --- a/libbeat/cfgfile/reload_test.go +++ b/libbeat/cfgfile/reload_integration_test.go @@ -21,25 +21,34 @@ package cfgfile import ( "fmt" - "io/ioutil" "os" + "path/filepath" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) func TestReloader(t *testing.T) { // Create random temp directory - dir, err := ioutil.TempDir("", "libbeat-reloader") - defer os.RemoveAll(dir) + dir, err := os.MkdirTemp("", "libbeat-reloader") + defer func() { + if t.Failed() { + t.Logf("test failed, temp dir '%s' was kept", dir) + return + } + os.RemoveAll(dir) + }() + if err != nil { - t.Fatal(err) + t.Fatalf("could not create temp dir: %s", err) } - glob := dir + "/*.yml" + glob := filepath.Join(dir, "*.yml") config := conf.MustNewConfigFrom(mapstr.M{ "path": glob, @@ -49,7 +58,7 @@ func TestReloader(t *testing.T) { }, }) // config.C{} - reloader := NewReloader(nil, config) + reloader := NewReloader(logp.L().Named("cfgfile-test.reload"), nil, config) retryCount := 10 go reloader.Run(nil) @@ -70,11 +79,17 @@ func TestReloader(t *testing.T) { // The first scan should cause a reload, but additional ones should not, // so configReloads should still be 1. - assert.Equal(t, int64(1), configReloads.Get()) + require.Equalf( + t, + int64(1), + configReloads.Get(), + "config reload should be called once, but it was called %d times", + configReloads.Get(), + ) // Write a file to the reloader path to trigger a real reload content := []byte("test\n") - err = ioutil.WriteFile(dir+"/config1.yml", content, 0644) + err = os.WriteFile(filepath.Join(dir, "config1.yml"), content, 0644) assert.NoError(t, err) // Wait for the number of scans to increase at least twice. This is somewhat diff --git a/libbeat/common/errors.go b/libbeat/common/errors.go index 9f5248e815e..9eddc425ae5 100644 --- a/libbeat/common/errors.go +++ b/libbeat/common/errors.go @@ -18,6 +18,7 @@ package common import ( + "errors" "fmt" ) @@ -31,3 +32,51 @@ type ErrInputNotFinished struct { func (e *ErrInputNotFinished) Error() string { return fmt.Sprintf("Can only start an input when all related states are finished: %+v", e.State) } + +type ErrNonReloadable struct { + Err error +} + +func (e ErrNonReloadable) Error() string { + return fmt.Sprintf("ErrNonReloadable: %v", e.Err) +} + +func (e ErrNonReloadable) Unwrap() error { return e.Err } + +func (e ErrNonReloadable) Is(err error) bool { + switch err.(type) { + case ErrNonReloadable: + return true + default: + return errors.Is(e.Err, err) + } +} + +// IsInputReloadable returns true if err, or any error wrapped +// by err can be retried. +// +// Effectively, it will only return false if ALL +// errors are ErrNonReloadable. +func IsInputReloadable(err error) bool { + if err == nil { + return false + } + + type unwrapList interface { + Unwrap() []error + } + + //nolint:errorlint // we only want to check that specific error, not all errors in the chain + errList, isErrList := err.(unwrapList) + if !isErrList { + return !errors.Is(err, ErrNonReloadable{}) + } + + for _, e := range errList.Unwrap() { + if !errors.Is(e, ErrNonReloadable{}) { + return true + } + } + + return false +} diff --git a/libbeat/common/errors_test.go b/libbeat/common/errors_test.go new file mode 100644 index 00000000000..6c83c675bf1 --- /dev/null +++ b/libbeat/common/errors_test.go @@ -0,0 +1,70 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package common + +import ( + "errors" + "fmt" + "testing" +) + +func TestIsInputReloadable(t *testing.T) { + testCases := map[string]struct { + err error + expected bool + }{ + "nil error is not retriable": { + err: nil, + expected: false, + }, + "simple error": { + err: errors.New("a generic error"), + expected: true, + }, + "common.ErrNonReloadable": { + err: ErrNonReloadable{}, + expected: false, + }, + "wrapped common.ErrNonReloadable": { + err: fmt.Errorf("wrapping %w", ErrNonReloadable{}), + expected: false, + }, + "errors.Join, all errors are ErrNonReloadable": { + err: errors.Join(ErrNonReloadable{}, ErrNonReloadable{}), + expected: false, + }, + "errors.Join, only one is ErrNonReloadable": { + err: errors.Join(errors.New("generic reloadable error"), ErrNonReloadable{}), + expected: true, + }, + } + + for name, tc := range testCases { + t.Run(name, func(t *testing.T) { + reloadable := IsInputReloadable(tc.err) + if reloadable != tc.expected { + t.Errorf( + "expecting isReloadable to return %t, but got %t for: '%v'", + tc.expected, + reloadable, + tc.err, + ) + } + }) + } +} diff --git a/metricbeat/beater/metricbeat.go b/metricbeat/beater/metricbeat.go index bcb9a893a87..0695e53ea90 100644 --- a/metricbeat/beater/metricbeat.go +++ b/metricbeat/beater/metricbeat.go @@ -264,7 +264,7 @@ func (bt *Metricbeat) Run(b *beat.Beat) error { // Dynamic file based modules (metricbeat.config.modules) if bt.config.ConfigModules.Enabled() { - moduleReloader := cfgfile.NewReloader(b.Publisher, bt.config.ConfigModules) + moduleReloader := cfgfile.NewReloader(logp.L().Named("module.reload"), b.Publisher, bt.config.ConfigModules) if err := moduleReloader.Check(factory); err != nil { return err diff --git a/metricbeat/helper/prometheus/prometheus.go b/metricbeat/helper/prometheus/prometheus.go index f4e06df7e1f..50929c128c3 100644 --- a/metricbeat/helper/prometheus/prometheus.go +++ b/metricbeat/helper/prometheus/prometheus.go @@ -35,6 +35,9 @@ const acceptHeader = `text/plain;version=0.0.4;q=0.5,*/*;q=0.1` // Prometheus helper retrieves prometheus formatted metrics type Prometheus interface { + // GetHttp returns the HTTP Client that handles the connection towards remote endpoint + GetHttp() (*helper.HTTP, error) + // GetFamilies requests metric families from prometheus endpoint and returns them GetFamilies() ([]*MetricFamily, error) @@ -66,6 +69,15 @@ func NewPrometheusClient(base mb.BaseMetricSet) (Prometheus, error) { return &prometheus{http, base.Logger()}, nil } +// GetHttp returns HTTP Client +func (p *prometheus) GetHttp() (*helper.HTTP, error) { + httpClient, ok := p.httpfetcher.(*helper.HTTP) + if !ok { + return nil, fmt.Errorf("httpfetcher is not of type *helper.HTTP") + } + return httpClient, nil +} + // GetFamilies requests metric families from prometheus endpoint and returns them func (p *prometheus) GetFamilies() ([]*MetricFamily, error) { var reader io.Reader diff --git a/metricbeat/module/kubernetes/apiserver/metricset.go b/metricbeat/module/kubernetes/apiserver/metricset.go index 9dd9a81976d..5457093e553 100644 --- a/metricbeat/module/kubernetes/apiserver/metricset.go +++ b/metricbeat/module/kubernetes/apiserver/metricset.go @@ -19,9 +19,14 @@ package apiserver import ( "fmt" + "net/http" + "strings" + "time" + "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -29,9 +34,11 @@ import ( // Metricset for apiserver is a prometheus based metricset type Metricset struct { mb.BaseMetricSet + http *helper.HTTP prometheusClient prometheus.Prometheus prometheusMappings *prometheus.MetricsMapping clusterMeta mapstr.M + mod k8smod.Module } var _ mb.ReportingMetricSetV2Error = (*Metricset)(nil) @@ -41,11 +48,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } + + http, err := pc.GetHttp() + if err != nil { + return nil, fmt.Errorf("the http connection is not valid") + } ms := &Metricset{ BaseMetricSet: base, + http: http, prometheusClient: pc, prometheusMappings: mapping, clusterMeta: util.AddClusterECSMeta(base), + mod: mod, } return ms, nil @@ -54,20 +73,36 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch gathers information from the apiserver and reports events with this information. func (m *Metricset) Fetch(reporter mb.ReporterV2) error { events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + errorString := fmt.Sprintf("%s", err) + errorUnauthorisedMsg := fmt.Sprintf("unexpected status code %d", http.StatusUnauthorized) + if err != nil && strings.Contains(errorString, errorUnauthorisedMsg) { + count := 2 // We retry twice to refresh the Authorisation token in case of http.StatusUnauthorize = 401 Error + for count > 0 { + if _, errAuth := m.http.RefreshAuthorizationHeader(); errAuth == nil { + events, err = m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + } + if err != nil { + time.Sleep(m.mod.Config().Period) + count-- + } else { + break + } + } + } + // We need to check for err again in case error is not 401 or RefreshAuthorizationHeader has failed if err != nil { return fmt.Errorf("error getting metrics: %w", err) - } - - for _, e := range events { - event := mb.TransformMapStrToEvent("kubernetes", e, nil) - if len(m.clusterMeta) != 0 { - event.RootFields.DeepUpdate(m.clusterMeta) - } - isOpen := reporter.Event(event) - if !isOpen { - return nil + } else { + for _, e := range events { + event := mb.TransformMapStrToEvent("kubernetes", e, nil) + if len(m.clusterMeta) != 0 { + event.RootFields.DeepUpdate(m.clusterMeta) + } + isOpen := reporter.Event(event) + if !isOpen { + return nil + } } + return nil } - - return nil } diff --git a/metricbeat/module/kubernetes/controllermanager/controllermanager.go b/metricbeat/module/kubernetes/controllermanager/controllermanager.go index dbfcddc2b6b..6c7b1c8ae52 100644 --- a/metricbeat/module/kubernetes/controllermanager/controllermanager.go +++ b/metricbeat/module/kubernetes/controllermanager/controllermanager.go @@ -19,9 +19,14 @@ package controllermanager import ( "fmt" + "net/http" + "strings" + "time" + "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -74,9 +79,11 @@ func init() { // MetricSet implements the mb.PushMetricSet interface, and therefore does not rely on polling. type MetricSet struct { mb.BaseMetricSet + http *helper.HTTP prometheusClient prometheus.Prometheus prometheusMappings *prometheus.MetricsMapping clusterMeta mapstr.M + mod k8smod.Module } // New create a new instance of the MetricSet @@ -87,11 +94,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } + + http, err := pc.GetHttp() + if err != nil { + return nil, fmt.Errorf("the http connection is not valid") + } ms := &MetricSet{ BaseMetricSet: base, + http: http, prometheusClient: pc, prometheusMappings: mapping, clusterMeta: util.AddClusterECSMeta(base), + mod: mod, } return ms, nil } @@ -99,19 +118,37 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch gathers information from the apiserver and reports events with this information. func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + errorString := fmt.Sprintf("%s", err) + errorUnauthorisedMsg := fmt.Sprintf("unexpected status code %d", http.StatusUnauthorized) + if err != nil && strings.Contains(errorString, errorUnauthorisedMsg) { + count := 2 // We retry twice to refresh the Authorisation token in case of http.StatusUnauthorize = 401 Error + for count > 0 { + if _, errAuth := m.http.RefreshAuthorizationHeader(); errAuth == nil { + events, err = m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + } + if err != nil { + time.Sleep(m.mod.Config().Period) + count-- + } else { + break + } + } + } + // We need to check for err again in case error is not 401 or RefreshAuthorizationHeader has failed if err != nil { return fmt.Errorf("error getting metrics: %w", err) - } - for _, e := range events { - event := mb.TransformMapStrToEvent("kubernetes", e, nil) - if len(m.clusterMeta) != 0 { - event.RootFields.DeepUpdate(m.clusterMeta) - } - isOpen := reporter.Event(event) - if !isOpen { - return nil + } else { + for _, e := range events { + event := mb.TransformMapStrToEvent("kubernetes", e, nil) + if len(m.clusterMeta) != 0 { + event.RootFields.DeepUpdate(m.clusterMeta) + } + isOpen := reporter.Event(event) + if !isOpen { + return nil + } } - } - return nil + return nil + } } diff --git a/metricbeat/module/kubernetes/scheduler/scheduler.go b/metricbeat/module/kubernetes/scheduler/scheduler.go index f512c96b7f2..1b563ad000a 100644 --- a/metricbeat/module/kubernetes/scheduler/scheduler.go +++ b/metricbeat/module/kubernetes/scheduler/scheduler.go @@ -19,9 +19,14 @@ package scheduler import ( "fmt" + "net/http" + "strings" + "time" + "github.com/elastic/beats/v7/metricbeat/helper" "github.com/elastic/beats/v7/metricbeat/helper/prometheus" "github.com/elastic/beats/v7/metricbeat/mb" + k8smod "github.com/elastic/beats/v7/metricbeat/module/kubernetes" "github.com/elastic/beats/v7/metricbeat/module/kubernetes/util" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -78,9 +83,11 @@ func init() { // MetricSet implements the mb.PushMetricSet interface, and therefore does not rely on polling. type MetricSet struct { mb.BaseMetricSet + http *helper.HTTP prometheusClient prometheus.Prometheus prometheusMappings *prometheus.MetricsMapping clusterMeta mapstr.M + mod k8smod.Module } // New create a new instance of the MetricSet @@ -91,11 +98,23 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { if err != nil { return nil, err } + + mod, ok := base.Module().(k8smod.Module) + if !ok { + return nil, fmt.Errorf("must be child of kubernetes module") + } + + http, err := pc.GetHttp() + if err != nil { + return nil, fmt.Errorf("the http connection is not valid") + } ms := &MetricSet{ BaseMetricSet: base, + http: http, prometheusClient: pc, prometheusMappings: mapping, clusterMeta: util.AddClusterECSMeta(base), + mod: mod, } return ms, nil } @@ -103,20 +122,37 @@ func New(base mb.BaseMetricSet) (mb.MetricSet, error) { // Fetch gathers information from the apiserver and reports events with this information. func (m *MetricSet) Fetch(reporter mb.ReporterV2) error { events, err := m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + errorString := fmt.Sprintf("%s", err) + errorUnauthorisedMsg := fmt.Sprintf("unexpected status code %d", http.StatusUnauthorized) + if err != nil && strings.Contains(errorString, errorUnauthorisedMsg) { + count := 2 // We retry twice to refresh the Authorisation token in case of http.StatusUnauthorize = 401 Error + for count > 0 { + if _, errAuth := m.http.RefreshAuthorizationHeader(); errAuth == nil { + events, err = m.prometheusClient.GetProcessedMetrics(m.prometheusMappings) + } + if err != nil { + time.Sleep(m.mod.Config().Period) + count-- + } else { + break + } + } + } + // We need to check for err again in case error is not 401 or RefreshAuthorizationHeader has failed if err != nil { return fmt.Errorf("error getting metrics: %w", err) - } - - for _, e := range events { - event := mb.TransformMapStrToEvent("kubernetes", e, nil) - if len(m.clusterMeta) != 0 { - event.RootFields.DeepUpdate(m.clusterMeta) + } else { + for _, e := range events { + event := mb.TransformMapStrToEvent("kubernetes", e, nil) + if len(m.clusterMeta) != 0 { + event.RootFields.DeepUpdate(m.clusterMeta) + } + isOpen := reporter.Event(event) + if !isOpen { + return nil + } } - isOpen := reporter.Event(event) - if !isOpen { - return nil - } - } - return nil + return nil + } } diff --git a/metricbeat/module/linux/rapl/msr_test b/metricbeat/module/linux/rapl/msr_test deleted file mode 100755 index 4e5a0cb83bb..00000000000 Binary files a/metricbeat/module/linux/rapl/msr_test and /dev/null differ diff --git a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc index 5239c7a78ed..bebb5260bef 100644 --- a/x-pack/filebeat/docs/inputs/input-streaming.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-streaming.asciidoc @@ -359,7 +359,7 @@ This specifies whether fields should be replaced with a `*` or deleted entirely [float] ==== `retry` -The `retry` configuration allows the user to specify the number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted. It has a `wait_min` and `wait_max` configuration which specifies the minimum and maximum time to wait between retries. +The `retry` configuration allows the user to specify the number of times the input should attempt to reconnect to the streaming data source in the event of a connection failure. The default value is `nil` which means no retries will be attempted. It has a `wait_min` and `wait_max` configuration which specifies the minimum and maximum time to wait between retries. It also supports blanket retries and infinite retries via the `blanket_retires` and `infinite_retries` configuration options. These are set to `false` by default. ["source","yaml",subs="attributes"] ---- @@ -376,6 +376,8 @@ filebeat.inputs: max_attempts: 5 wait_min: 1s wait_max: 10s + blanket_retries: false + infinite_retries: false ---- [float] ==== `retry.max_attempts` @@ -392,6 +394,16 @@ The minimum time to wait between retries. This ensures that retries are spaced o The maximum time to wait between retries. This prevents the retry mechanism from becoming too slow, ensuring that the client does not wait indefinitely between retries. This is crucial in systems where timeouts or user experience are critical. For example, `wait_max` might be set to 10 seconds, meaning that even if the calculated backoff is greater than this, the client will wait at most 10 seconds before retrying. The default value is `30` seconds. +[float] +==== `retry.blanket_retries` + +Normally the input will only retry when a connection error is found to be retryable based on the error type and the RFC 6455 error codes defined by the websocket protocol. If `blanket_retries` is set to `true` (`false` by default) the input will retry on any error. This is not recommended unless the user is certain that all errors are transient and can be resolved by retrying. + +[float] +==== `retry.infinite_retries` + +Normally the input will only retry a maximum of `max_attempts` times. If `infinite_retries` is set to `true` (`false` by default) the input will retry indefinitely. This is not recommended unless the user is certain that the connection will eventually succeed. + [float] === `timeout` Timeout is the maximum amount of time the websocket dialer will wait for a connection to be established. The default value is `180` seconds. diff --git a/x-pack/filebeat/input/streaming/config.go b/x-pack/filebeat/input/streaming/config.go index 08eebf7262d..753c36febf3 100644 --- a/x-pack/filebeat/input/streaming/config.go +++ b/x-pack/filebeat/input/streaming/config.go @@ -66,9 +66,11 @@ type redact struct { } type retry struct { - MaxAttempts int `config:"max_attempts"` - WaitMin time.Duration `config:"wait_min"` - WaitMax time.Duration `config:"wait_max"` + MaxAttempts int `config:"max_attempts"` + WaitMin time.Duration `config:"wait_min"` + WaitMax time.Duration `config:"wait_max"` + BlanketRetries bool `config:"blanket_retries"` + InfiniteRetries bool `config:"infinite_retries"` } type authConfig struct { @@ -162,7 +164,7 @@ func (c config) Validate() error { if c.Retry != nil { switch { - case c.Retry.MaxAttempts <= 0: + case c.Retry.MaxAttempts <= 0 && !c.Retry.InfiniteRetries: return errors.New("max_attempts must be greater than zero") case c.Retry.WaitMin > c.Retry.WaitMax: return errors.New("wait_min must be less than or equal to wait_max") diff --git a/x-pack/filebeat/input/streaming/config_test.go b/x-pack/filebeat/input/streaming/config_test.go index b58f17bc762..5adb9b3ed81 100644 --- a/x-pack/filebeat/input/streaming/config_test.go +++ b/x-pack/filebeat/input/streaming/config_test.go @@ -141,6 +141,18 @@ var configTests = []struct { "url": "wss://localhost:443/v1/stream", }, }, + { + name: "valid_retry_with_infinite", + config: map[string]interface{}{ + "retry": map[string]interface{}{ + "infinite_retries": true, + "max_attempts": 0, + "wait_min": "1s", + "wait_max": "2s", + }, + "url": "wss://localhost:443/v1/stream", + }, + }, { name: "valid_authStyle_in_params", config: map[string]interface{}{ diff --git a/x-pack/filebeat/input/streaming/input_test.go b/x-pack/filebeat/input/streaming/input_test.go index 70ba545b3ab..df9b406e17c 100644 --- a/x-pack/filebeat/input/streaming/input_test.go +++ b/x-pack/filebeat/input/streaming/input_test.go @@ -452,7 +452,7 @@ var inputTests = []struct { "wait_max": "2s", }, }, - wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake"), + wantErr: fmt.Errorf("failed to establish WebSocket connection after 2 attempts with error websocket: bad handshake and (status 403)"), }, { name: "single_event_tls", diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index 486db1f204d..375101d81c1 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -195,7 +195,7 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { _, message, err := c.ReadMessage() if err != nil { s.metrics.errorsTotal.Inc() - if !isRetryableError(err) { + if !s.cfg.Retry.BlanketRetries && !isRetryableError(err) { s.log.Errorw("failed to read websocket data", "error", err) return err } @@ -253,6 +253,9 @@ func isRetryableError(err error) bool { websocket.CloseInternalServerErr, websocket.CloseTryAgainLater, websocket.CloseServiceRestart, + websocket.CloseAbnormalClosure, + websocket.CloseMessageTooBig, + websocket.CloseNoStatusReceived, websocket.CloseTLSHandshake: return true } @@ -307,21 +310,38 @@ func connectWebSocket(ctx context.Context, cfg config, url string, log *logp.Log } if cfg.Retry != nil { retryConfig := cfg.Retry - for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ { - conn, response, err = dialer.DialContext(ctx, url, headers) - if err == nil { - return conn, response, nil + if !retryConfig.InfiniteRetries { + for attempt := 1; attempt <= retryConfig.MaxAttempts; attempt++ { + conn, response, err = dialer.DialContext(ctx, url, headers) + if err == nil { + return conn, response, nil + } + //nolint:errorlint // it will never be a wrapped error at this point + if err == websocket.ErrBadHandshake { + log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode) + } else { + log.Errorf("attempt %d: webSocket connection failed with error %v and (status %d), retrying...\n", attempt, err, response.StatusCode) + } + waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt) + time.Sleep(waitTime) } - //nolint:errorlint // it will never be a wrapped error at this point - if err == websocket.ErrBadHandshake { - log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode) - continue + return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w and (status %d)", retryConfig.MaxAttempts, err, response.StatusCode) + } else { + for attempt := 1; ; attempt++ { + conn, response, err = dialer.DialContext(ctx, url, headers) + if err == nil { + return conn, response, nil + } + //nolint:errorlint // it will never be a wrapped error at this point + if err == websocket.ErrBadHandshake { + log.Errorf("attempt %d: webSocket connection failed with bad handshake (status %d) retrying...\n", attempt, response.StatusCode) + } else { + log.Errorf("attempt %d: webSocket connection failed with error %v and (status %d), retrying...\n", attempt, err, response.StatusCode) + } + waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt) + time.Sleep(waitTime) } - log.Debugf("attempt %d: webSocket connection failed. retrying...\n", attempt) - waitTime := calculateWaitTime(retryConfig.WaitMin, retryConfig.WaitMax, attempt) - time.Sleep(waitTime) } - return nil, nil, fmt.Errorf("failed to establish WebSocket connection after %d attempts with error %w", retryConfig.MaxAttempts, err) } return dialer.DialContext(ctx, url, headers) diff --git a/x-pack/libbeat/management/managerV2.go b/x-pack/libbeat/management/managerV2.go index 288f28ae0de..c5f074e13d7 100644 --- a/x-pack/libbeat/management/managerV2.go +++ b/x-pack/libbeat/management/managerV2.go @@ -14,7 +14,6 @@ import ( "syscall" "time" - "github.com/joeshaw/multierror" "go.uber.org/zap/zapcore" "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" @@ -681,10 +680,19 @@ func (cm *BeatV2Manager) reload(units map[unitKey]*agentUnit) { // // in v2 only a single input type will be started per component, so we don't need to // worry about getting multiple re-loaders (we just need the one for the type) - if err := cm.reloadInputs(inputUnits); err != nil { - merror := &multierror.MultiError{} - if errors.As(err, &merror) { - for _, err := range merror.Errors { + if err := cm.reloadInputs(inputUnits); err != nil { // HERE + // cm.reloadInputs will use fmt.Errorf and join an error slice + // using errors.Join, so we need to unwrap the fmt wrapped error, + // then we can iterate over the errors list. + err = errors.Unwrap(err) + type unwrapList interface { + Unwrap() []error + } + + //nolint:errorlint // That's a custom logic based on how reloadInputs builds the error + errList, isErrList := err.(unwrapList) + if isErrList { + for _, err := range errList.Unwrap() { unitErr := cfgfile.UnitError{} if errors.As(err, &unitErr) { unitErrors[unitErr.UnitID] = append(unitErrors[unitErr.UnitID], unitErr.Err) @@ -824,8 +832,7 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*agentUnit) error { } if err := obj.Reload(inputBeatCfgs); err != nil { - merror := &multierror.MultiError{} - realErrors := multierror.Errors{} + realErrors := []error{} // At the moment this logic is tightly bound to the current RunnerList // implementation from libbeat/cfgfile/list.go and Input.loadStates from @@ -833,8 +840,12 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*agentUnit) error { // If they change the way they report errors, this will break. // TODO (Tiago): update all layers to use the most recent features from // the standard library errors package. - if errors.As(err, &merror) { - for _, err := range merror.Errors { + type unwrapList interface { + Unwrap() []error + } + errList, isErrList := err.(unwrapList) //nolint:errorlint // see the comment above + if isErrList { + for _, err := range errList.Unwrap() { causeErr := errors.Unwrap(err) // A Log input is only marked as finished when all events it // produced are acked by the acker so when we see this error, @@ -855,7 +866,7 @@ func (cm *BeatV2Manager) reloadInputs(inputUnits []*agentUnit) error { } if len(realErrors) != 0 { - return fmt.Errorf("failed to reload inputs: %w", realErrors.Err()) + return fmt.Errorf("failed to reload inputs: %w", errors.Join(realErrors...)) } } else { // If there was no error reloading input and forceReload was diff --git a/x-pack/libbeat/management/managerV2_test.go b/x-pack/libbeat/management/managerV2_test.go index e7515266b0b..73804565dbf 100644 --- a/x-pack/libbeat/management/managerV2_test.go +++ b/x-pack/libbeat/management/managerV2_test.go @@ -12,7 +12,6 @@ import ( "testing" "time" - "github.com/joeshaw/multierror" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap/zapcore" @@ -547,14 +546,14 @@ func TestErrorPerUnit(t *testing.T) { r.MustRegisterOutput(output) inputs := &mockReloadable{ ReloadFn: func(configs []*reload.ConfigWithMeta) error { - errs := multierror.Errors{} + errs := []error{} for _, input := range configs { errs = append(errs, cfgfile.UnitError{ UnitID: input.InputUnitID, Err: errors.New(errorMessages[input.InputUnitID]), }) } - return errs.Err() + return errors.Join(errs...) }, } r.MustRegisterInput(inputs)