diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c419087dc5ba..c68c1661558d 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -39,6 +39,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d - [Azure Logs] Fix authentication_processing_details parsing in sign-in logs. {issue}34330[34330] {pull}34478[34478] - Fix dropped events when monitor a beat under the agent and send its `Host info` log entry. {pull}34599[34599] +- Fix panics when a processor is closed twice {pull}34647[34647] *Heartbeat* @@ -120,23 +121,3 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d *Functionbeat* ==== Known Issue - - - - - - - - - - - - - - - - - - - - diff --git a/filebeat/channel/runner.go b/filebeat/channel/runner.go index 11e31ad88415..ca6a1cda3fab 100644 --- a/filebeat/channel/runner.go +++ b/filebeat/channel/runner.go @@ -126,28 +126,28 @@ func newCommonConfigEditor( return nil, err } - var indexProcessor processors.Processor - if !config.Index.IsEmpty() { - staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version) - timestampFormat, err := - fmtstr.NewTimestampFormatString(&config.Index, staticFields) - if err != nil { - return nil, err - } - indexProcessor = add_formatted_index.New(timestampFormat) - } - - userProcessors, err := processors.New(config.Processors) - if err != nil { - return nil, err - } - serviceType := config.ServiceType if serviceType == "" { serviceType = config.Module } return func(clientCfg beat.ClientConfig) (beat.ClientConfig, error) { + var indexProcessor processors.Processor + if !config.Index.IsEmpty() { + staticFields := fmtstr.FieldsForBeat(beatInfo.Beat, beatInfo.Version) + timestampFormat, err := + fmtstr.NewTimestampFormatString(&config.Index, staticFields) + if err != nil { + return clientCfg, err + } + indexProcessor = add_formatted_index.New(timestampFormat) + } + + userProcessors, err := processors.New(config.Processors) + if err != nil { + return clientCfg, err + } + meta := clientCfg.Processing.Meta.Clone() fields := clientCfg.Processing.Fields.Clone() @@ -191,6 +191,6 @@ func newCommonConfigEditor( func setOptional(to common.MapStr, key string, value string) { if value != "" { - to.Put(key, value) + _, _ = to.Put(key, value) } } diff --git a/libbeat/processors/safe_processor.go b/libbeat/processors/safe_processor.go index c8498cb2d78b..0032dd53855c 100644 --- a/libbeat/processors/safe_processor.go +++ b/libbeat/processors/safe_processor.go @@ -23,6 +23,7 @@ import ( "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/common" + "github.com/elastic/beats/v7/libbeat/logp" ) var ErrClosed = errors.New("attempt to use a closed processor") @@ -45,6 +46,7 @@ func (p *SafeProcessor) Close() (err error) { if atomic.CompareAndSwapUint32(&p.closed, 0, 1) { return Close(p.Processor) } + logp.L().Warnf("tried to close already closed %q processor", p.Processor.String()) return nil }