Skip to content

Commit

Permalink
resolved conflict
Browse files Browse the repository at this point in the history
  • Loading branch information
ShourieG committed Jan 7, 2025
2 parents 589c8c8 + 177a47a commit 8288e17
Show file tree
Hide file tree
Showing 30 changed files with 753 additions and 110 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,10 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fixes filestream logging the error "filestream input with ID 'ID' already exists, this will lead to data duplication[...]" on Kubernetes when using autodiscover. {pull}41585[41585]
- Add kafka compression support for ZSTD.
- Filebeat fails to start if there is any input with a duplicated ID. It logs the duplicated IDs and the offending inputs configurations. {pull}41731[41731]
- Filestream inputs with duplicated IDs will fail to start. An error is logged showing the ID and the full input configuration. {issue}41938[41938] {pull}41954[41954]
- Filestream inputs can define `allow_deprecated_id_duplication: true` to run keep the previous behaviour of running inputs with duplicated IDs. {issue}41938[41938] {pull}41954[41954]
- The Filestream input only starts to ingest a file when it is >= 1024 bytes in size. This happens because the fingerprint` is the default file identity now. To restore the previous behaviour, set `file_identity.native: ~` and `prospector.scanner.fingerprint.enabled: false` {issue}40197[40197] {pull}41762[41762]

*Heartbeat*


Expand Down Expand Up @@ -200,6 +203,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix awss3 document ID construction when using the CSV decoder. {pull}42019[42019]
- The `_id` generation process for S3 events has been updated to incorporate the LastModified field. This enhancement ensures that the `_id` is unique. {pull}42078[42078]
- Fix Netflow Template Sharing configuration handling. {pull}42080[42080]
- Updated websocket retry error code list to allow more scenarios to be retried which could have been missed previously. {pull}42218[42218]

*Heartbeat*

Expand Down Expand Up @@ -233,6 +237,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Do not report non-existant 0 values for RSS metrics in docker/memory {pull}41449[41449]
- Log Cisco Meraki `getDevicePerformanceScores` errors without stopping metrics collection. {pull}41622[41622]
- Don't skip first bucket value in GCP metrics metricset for distribution type metrics {pull}41822[41822]
- [K8s Integration] Enhance HTTP authentication in case of token updates for Apiserver, Controllermanager and Scheduler metricsets {issue}41910[41910] {pull}42016[42016]
- Fixed `creation_date` scientific notation output in the `elasticsearch.index` metricset. {pull}42053[42053]
- Fix bug where metricbeat unintentionally triggers Windows ASR. {pull}42177[42177]

Expand Down Expand Up @@ -379,6 +384,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Added default values in the streaming input for websocket retries and put a cap on retry wait time to be lesser than equal to the maximum defined wait time. {pull}42012[42012]
- Rate limiting fault tolerance improvements in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}42094[42094]
- Added OAuth2 support with auto token refresh for websocket streaming input. {issue}41989[41989] {pull}42212[42212]
- Added infinite & blanket retry options to websockets and improved logging and retry logic. {pull}42225[42225]

*Auditbeat*

Expand Down
4 changes: 2 additions & 2 deletions filebeat/beater/crawler.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ func (c *crawler) Start(
}

if configInputs.Enabled() {
c.inputReloader = cfgfile.NewReloader(pipeline, configInputs)
c.inputReloader = cfgfile.NewReloader(log.Named("input.reloader"), pipeline, configInputs)
if err := c.inputReloader.Check(c.inputsFactory); err != nil {
return fmt.Errorf("creating input reloader failed: %w", err)
}
}

if configModules.Enabled() {
c.modulesReloader = cfgfile.NewReloader(pipeline, configModules)
c.modulesReloader = cfgfile.NewReloader(log.Named("module.reloader"), pipeline, configModules)
if err := c.modulesReloader.Check(c.modulesFactory); err != nil {
return fmt.Errorf("creating module reloader failed: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion filebeat/beater/filebeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (fb *Filebeat) setupPipelineLoaderCallback(b *beat.Beat) error {
newPath := strings.TrimSuffix(origPath, ".yml")
_ = fb.config.ConfigModules.SetString("path", -1, newPath)
}
modulesLoader := cfgfile.NewReloader(fb.pipeline, fb.config.ConfigModules)
modulesLoader := cfgfile.NewReloader(logp.L().Named("module.reloader"), fb.pipeline, fb.config.ConfigModules)
modulesLoader.Load(modulesFactory)
}

Expand Down
17 changes: 16 additions & 1 deletion filebeat/docs/inputs/input-filestream-file-options.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,27 @@ supported.
===== `id`

A unique identifier for this filestream input. Each filestream input
must have a unique ID.
must have a unique ID. Filestream will not start inputs with duplicated IDs.

WARNING: Changing input ID may cause data duplication because the
state of the files will be lost and they will be read from the
beginning again.

[float]
[[filestream-input-allow_deprecated_id_duplication]]
===== `allow_deprecated_id_duplication`

This allows {beatname_uc} to run multiple instances of the {type}
input with the same ID. This is intended to add backwards
compatibility with the behaviour prior to 9.0. It defaults to `false`
and is **not recommended** in new configurations.

This setting is per input, so make sure to enable it in all {type}
inputs that use duplicated IDs.

WARNING: Duplicated IDs will lead to data duplication and some input
instances will not produce any metrics.

[float]
[[filestream-input-paths]]
===== `paths`
Expand Down
11 changes: 11 additions & 0 deletions filebeat/input/filestream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ type config struct {
IgnoreInactive ignoreInactiveType `config:"ignore_inactive"`
Rotation *conf.Namespace `config:"rotation"`
TakeOver bool `config:"take_over"`

// AllowIDDuplication is used by InputManager.Create
// (see internal/input-logfile/manager.go).
AllowIDDuplication bool `config:"allow_deprecated_id_duplication"`
}

type closerConfig struct {
Expand Down Expand Up @@ -142,6 +146,13 @@ func (c *config) Validate() error {
return fmt.Errorf("no path is configured")
}

if c.AllowIDDuplication {
logp.L().Named("input.filestream").Warn(
"setting `allow_deprecated_id_duplication` will lead to data " +
"duplication and incomplete input metrics, it's use is " +
"highly discouraged.")
}

return nil
}

Expand Down
17 changes: 17 additions & 0 deletions filebeat/input/filestream/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,23 @@ func TestFilestreamMetadataUpdatedOnRename(t *testing.T) {
"prospector.scanner.check_interval": "1ms",
"prospector.scanner.fingerprint.enabled": false,
"file_identity.native": map[string]any{},
// For some reason this test became flaky, the root of the flakiness
// is not on the test, it is on how a rename operation is detected.
// Even though this test uses `os.Rename`, it does not seem to be an atomic
// operation. https://www.man7.org/linux/man-pages/man2/rename.2.html
// does not make it clear whether 'renameat' (used by `os.Rename`) is
// atomic.
//
// On a flaky execution, the file is actually perceived as removed
// and then a new file is created, both with the same inode. This
// happens on a system that does not reuse inodes as soon they're
// freed. Because the file is detected as removed, it's state is also
// removed. Then when more data is added, only the offset of the new
// data is tracked by the registry, causing the test to fail.
//
// A workaround for this is to not remove the state when the file is
// removed, hence `clean_removed: false` is set here.
"clean_removed": false,
})

testline := []byte("log line\n")
Expand Down
47 changes: 38 additions & 9 deletions filebeat/input/filestream/internal/input-logfile/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"sync"
"time"

"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/go-concert/unison"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
Expand Down Expand Up @@ -155,26 +156,54 @@ func (cim *InputManager) Create(config *conf.C) (v2.Input, error) {
}

settings := struct {
ID string `config:"id"`
CleanInactive time.Duration `config:"clean_inactive"`
HarvesterLimit uint64 `config:"harvester_limit"`
ID string `config:"id"`
CleanInactive time.Duration `config:"clean_inactive"`
HarvesterLimit uint64 `config:"harvester_limit"`
AllowIDDuplication bool `config:"allow_deprecated_id_duplication"`
}{CleanInactive: cim.DefaultCleanTimeout}
if err := config.Unpack(&settings); err != nil {
return nil, err
}

if settings.ID == "" {
cim.Logger.Error("filestream input ID without ID might lead to data" +
" duplication, please add an ID and restart Filebeat")
cim.Logger.Warn("filestream input without ID is discouraged, please add an ID and restart Filebeat")
}

metricsID := settings.ID
cim.idsMux.Lock()
if _, exists := cim.ids[settings.ID]; exists {
cim.Logger.Errorf("filestream input with ID '%s' already exists, this "+
"will lead to data duplication, please use a different ID. Metrics "+
"collection has been disabled on this input.", settings.ID)
metricsID = ""
duplicatedInput := map[string]any{}
unpackErr := config.Unpack(&duplicatedInput)
if unpackErr != nil {
duplicatedInput["error"] = fmt.Errorf("failed to unpack duplicated input config: %w", unpackErr).Error()
}

// Keep old behaviour so users can upgrade to 9.0 without
// having their inputs not starting.
if settings.AllowIDDuplication {
cim.Logger.Errorf("filestream input with ID '%s' already exists, "+
"this will lead to data duplication, please use a different "+
"ID. Metrics collection has been disabled on this input. The "+
" input will start only because "+
"'allow_deprecated_id_duplication' is set to true",
settings.ID)
metricsID = ""
} else {
cim.Logger.Errorw(
fmt.Sprintf(
"filestream input ID '%s' is duplicated: input will NOT start",
settings.ID,
),
"input.cfg", conf.DebugString(config, true))

cim.idsMux.Unlock()
return nil, &common.ErrNonReloadable{
Err: fmt.Errorf(
"filestream input with ID '%s' already exists, this "+
"will lead to data duplication, please use a different ID",
settings.ID,
)}
}
}

// TODO: improve how inputs with empty IDs are tracked.
Expand Down
124 changes: 124 additions & 0 deletions filebeat/input/filestream/internal/input-logfile/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,16 @@ package input_logfile

import (
"bytes"
"fmt"
"sync"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"

v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/statestore"
"github.com/elastic/beats/v7/libbeat/statestore/storetest"
"github.com/elastic/elastic-agent-libs/config"
Expand All @@ -42,6 +45,18 @@ func (s *testSource) Name() string {
return s.name
}

type noopProspector struct{}

func (m noopProspector) Init(_, _ ProspectorCleaner, _ func(Source) string) error {
return nil
}

func (m noopProspector) Run(_ v2.Context, _ StateMetadataUpdater, _ HarvesterGroup) {}

func (m noopProspector) Test() error {
return nil
}

func TestSourceIdentifier_ID(t *testing.T) {
testCases := map[string]struct {
userID string
Expand Down Expand Up @@ -198,6 +213,115 @@ func TestInputManager_Create(t *testing.T) {
assert.NotContains(t, buff.String(),
"already exists")
})

t.Run("does not start an input with duplicated ID", func(t *testing.T) {
tcs := []struct {
name string
id string
}{
{name: "ID is empty", id: ""},
{name: "non-empty ID", id: "non-empty-ID"},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
storeReg := statestore.NewRegistry(storetest.NewMemoryStoreBackend())
testStore, err := storeReg.Get("test")
require.NoError(t, err)

log, buff := newBufferLogger()

cim := &InputManager{
Logger: log,
StateStore: testStateStore{Store: testStore},
Configure: func(_ *config.C) (Prospector, Harvester, error) {
var wg sync.WaitGroup

return &noopProspector{}, &mockHarvester{onRun: correctOnRun, wg: &wg}, nil
}}
cfg1 := config.MustNewConfigFrom(fmt.Sprintf(`
type: filestream
id: %s
paths:
- /var/log/foo
`, tc.id))

// Create a different 2nd config with duplicated ID to ensure
// the ID itself is the only requirement to prevent the 2nd input
// from being created.
cfg2 := config.MustNewConfigFrom(fmt.Sprintf(`
type: filestream
id: %s
paths:
- /var/log/bar
`, tc.id))

_, err = cim.Create(cfg1)
require.NoError(t, err, "1st input should have been created")

// Attempt to create an input with a duplicated ID
_, err = cim.Create(cfg2)
require.Error(t, err, "filestream should not have created an input with a duplicated ID")

logs := buff.String()
// Assert the logs contain the correct log message
assert.Contains(t, logs,
fmt.Sprintf("filestream input ID '%s' is duplicated:", tc.id))

// Assert the error contains the correct text
assert.Contains(t, err.Error(),
fmt.Sprintf("filestream input with ID '%s' already exists", tc.id))
})
}
})

t.Run("allow duplicated IDs setting", func(t *testing.T) {
storeReg := statestore.NewRegistry(storetest.NewMemoryStoreBackend())
testStore, err := storeReg.Get("test")
require.NoError(t, err)

log, buff := newBufferLogger()

cim := &InputManager{
Logger: log,
StateStore: testStateStore{Store: testStore},
Configure: func(_ *config.C) (Prospector, Harvester, error) {
var wg sync.WaitGroup

return &noopProspector{}, &mockHarvester{onRun: correctOnRun, wg: &wg}, nil
}}
cfg1 := config.MustNewConfigFrom(`
type: filestream
id: duplicated-id
allow_deprecated_id_duplication: true
paths:
- /var/log/foo
`)

// Create a different 2nd config with duplicated ID to ensure
// the ID itself is the only requirement to prevent the 2nd input
// from being created.
cfg2 := config.MustNewConfigFrom(`
type: filestream
id: duplicated-id
allow_deprecated_id_duplication: true
paths:
- /var/log/bar
`)
_, err = cim.Create(cfg1)
require.NoError(t, err, "1st input should have been created")
// Create an input with a duplicated ID
_, err = cim.Create(cfg2)
require.NoError(t, err, "filestream should not have created an input with a duplicated ID")

logs := buff.String()
// Assert the logs contain the correct log message
assert.Contains(t, logs,
"filestream input with ID 'duplicated-id' already exists, this "+
"will lead to data duplication, please use a different ID. Metrics "+
"collection has been disabled on this input.",
"did not find the expected message about the duplicated input ID")
})
}

func newBufferLogger() (*logp.Logger, *bytes.Buffer) {
Expand Down
2 changes: 1 addition & 1 deletion filebeat/tests/integration/filestream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ logging:
filebeat.WaitForLogs(
"Input 'filestream' starting",
10*time.Second,
"Filebeat did log a validation error")
"Filebeat did not log a validation error")
}

func TestFilestreamCanMigrateIdentity(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion heartbeat/beater/heartbeat.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
}

if bt.config.ConfigMonitors.Enabled() {
bt.monitorReloader = cfgfile.NewReloader(b.Publisher, bt.config.ConfigMonitors)
bt.monitorReloader = cfgfile.NewReloader(logp.L().Named("module.reload"), b.Publisher, bt.config.ConfigMonitors)
defer bt.monitorReloader.Stop()

err := bt.RunReloadableMonitors()
Expand Down
Loading

0 comments on commit 8288e17

Please sign in to comment.