Skip to content

Commit

Permalink
[7.17](backport #34761) Stop re-using processors defined in the config (
Browse files Browse the repository at this point in the history
#34764)

* Stop re-using processors defined in the config (#34761)

* Stop re-using processors defined in the config

After introducing the `SafeProcessor` wrapper in
#34647 we started returning
errors when a processor is being used after its `Close` function has
been called.

This led to dropped events and error spam in logs but also confirmed
that the root cause of the problem was not just a race condition on
`Close` but re-used processors somewhere.

After a long investigation such code that's re-using processors was
finally found.

This is the change that removes re-using the processors and
instantiates them on each input restart.

* Fix linter issues

* Add changelog entry

(cherry picked from commit 5cfe62c)

# Conflicts:
#	filebeat/channel/runner.go
#	libbeat/processors/safe_processor.go

* Resolve conflicts, fix changelog

* Add new line to changelog

* Revert comment auto-formatting

---------

Co-authored-by: Denis <[email protected]>
  • Loading branch information
mergify[bot] and rdner authored Mar 8, 2023
1 parent b850575 commit 48a5664
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 37 deletions.
21 changes: 1 addition & 20 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d

- [Azure Logs] Fix authentication_processing_details parsing in sign-in logs. {issue}34330[34330] {pull}34478[34478]
- Fix dropped events when monitor a beat under the agent and send its `Host info` log entry. {pull}34599[34599]
- Fix panics when a processor is closed twice {pull}34647[34647]

*Heartbeat*

Expand Down Expand Up @@ -120,23 +121,3 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
*Functionbeat*

==== Known Issue




















34 changes: 17 additions & 17 deletions filebeat/channel/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,28 +126,28 @@ func newCommonConfigEditor(
return nil, err
}

var indexProcessor processors.Processor
if !config.Index.IsEmpty() {
staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version)
timestampFormat, err :=
fmtstr.NewTimestampFormatString(&config.Index, staticFields)
if err != nil {
return nil, err
}
indexProcessor = add_formatted_index.New(timestampFormat)
}

userProcessors, err := processors.New(config.Processors)
if err != nil {
return nil, err
}

serviceType := config.ServiceType
if serviceType == "" {
serviceType = config.Module
}

return func(clientCfg beat.ClientConfig) (beat.ClientConfig, error) {
var indexProcessor processors.Processor
if !config.Index.IsEmpty() {
staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version)
timestampFormat, err :=
fmtstr.NewTimestampFormatString(&config.Index, staticFields)
if err != nil {
return clientCfg, err
}
indexProcessor = add_formatted_index.New(timestampFormat)
}

userProcessors, err := processors.New(config.Processors)
if err != nil {
return clientCfg, err
}

meta := clientCfg.Processing.Meta.Clone()
fields := clientCfg.Processing.Fields.Clone()

Expand Down Expand Up @@ -191,6 +191,6 @@ func newCommonConfigEditor(

func setOptional(to common.MapStr, key string, value string) {
if value != "" {
to.Put(key, value)
_, _ = to.Put(key, value)
}
}
2 changes: 2 additions & 0 deletions libbeat/processors/safe_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/common"
"github.com/elastic/beats/v7/libbeat/logp"
)

var ErrClosed = errors.New("attempt to use a closed processor")
Expand All @@ -45,6 +46,7 @@ func (p *SafeProcessor) Close() (err error) {
if atomic.CompareAndSwapUint32(&p.closed, 0, 1) {
return Close(p.Processor)
}
logp.L().Warnf("tried to close already closed %q processor", p.Processor.String())
return nil
}

Expand Down

0 comments on commit 48a5664

Please sign in to comment.