diff --git a/filebeat/channel/runner.go b/filebeat/channel/runner.go index ca6a1cda3fab..7f1ec01a709e 100644 --- a/filebeat/channel/runner.go +++ b/filebeat/channel/runner.go @@ -72,6 +72,7 @@ func (f *onCreateFactory) Create(pipeline beat.PipelineConnector, cfg *common.Co // configuration file settings. // // Common settings ensured by this factory wrapper: +<<<<<<< HEAD // - *fields*: common fields to be added to the pipeline // - *fields_under_root*: select at which level to store the fields // - *tags*: add additional tags to the events @@ -83,6 +84,19 @@ func (f *onCreateFactory) Create(pipeline beat.PipelineConnector, cfg *common.Co // - *index*: Configure the index name for events to be collected from this input // - *type*: implicit event type // - *service.type*: implicit event type +======= +// - *fields*: common fields to be added to the pipeline +// - *fields_under_root*: select at which level to store the fields +// - *tags*: add additional tags to the events +// - *processors*: list of local processors to be added to the processing pipeline +// - *keep_null*: keep or remove 'null' from events to be published +// - *_module_name* (hidden setting): Add fields describing the module name +// - *_ fileset_name* (hidden setting): +// - *pipeline*: Configure the ES Ingest Node pipeline name to be used for events from this input +// - *index*: Configure the index name for events to be collected from this input +// - *type*: implicit event type +// - *service.type*: implicit event type +>>>>>>> 3d917c8519 (Add test for the processor re-use issue (#34870)) func RunnerFactoryWithCommonInputSettings(info beat.Info, f cfgfile.RunnerFactory) cfgfile.RunnerFactory { return wrapRunnerCreate(f, func( diff --git a/filebeat/channel/runner_mock_test.go b/filebeat/channel/runner_mock_test.go new file mode 100644 index 000000000000..e784d833e9e3 --- /dev/null +++ b/filebeat/channel/runner_mock_test.go @@ -0,0 +1,126 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package channel + +import ( + "testing" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/cfgfile" + + conf "github.com/elastic/elastic-agent-libs/config" + + "github.com/stretchr/testify/require" +) + +type runnerFactoryMock struct { + clientCount int + cfgs []beat.ClientConfig +} + +func (r *runnerFactoryMock) Create(p beat.PipelineConnector, config *conf.C) (cfgfile.Runner, error) { + // When using the connector multiple times to create a client + // it's using the same editor function for creating a new client + // with a modified configuration that includes predefined processing. + // This is why we must make sure nothing is re-used from one client to another. + for i := 0; i < r.clientCount; i++ { + client, err := p.ConnectWith(beat.ClientConfig{}) + if err != nil { + return nil, err + } + + // storing the config that the client was created with + // it's needed for the `Assert` later + r.cfgs = append(r.cfgs, client.(*clientMock).cfg) + } + return &struct { + cfgfile.Runner + }{}, nil +} + +func (runnerFactoryMock) CheckConfig(config *conf.C) error { + return nil +} + +// Assert runs various checks for the clients created by the wrapped pipeline connector +// We check that the processing configuration does not reference the same addresses as before, +// re-using some parts of the processing configuration will result in various issues, such as: +// * closing processors multiple times +// * using closed processors +// * modifiying an object shared by multiple pipeline clients +func (r runnerFactoryMock) Assert(t *testing.T) { + t.Helper() + + // we need to make sure `Assert` is called after `Create` + require.Len(t, r.cfgs, r.clientCount) + + t.Run("new processing configuration each time", func(t *testing.T) { + for i, c1 := range r.cfgs { + for j, c2 := range r.cfgs { + if i == j { + continue + } + + require.NotSamef(t, c1.Processing, c2.Processing, "processing configuration cannot be re-used") + require.NotSamef(t, c1.Processing.Meta, c2.Processing.Meta, "`Processing.Meta` cannot be re-used") + require.NotSamef(t, c1.Processing.Fields, c2.Processing.Fields, "`Processing.Fields` cannot be re-used") + require.NotSamef(t, c1.Processing.Processor, c2.Processing.Processor, "`Processing.Processor` cannot be re-used") + } + } + }) + + t.Run("new processors each time", func(t *testing.T) { + var processors []beat.Processor + for _, c := range r.cfgs { + processors = append(processors, c.Processing.Processor.All()...) + } + + require.NotEmptyf(t, processors, "for this test the list of processors cannot be empty") + + for i, p1 := range processors { + for j, p2 := range processors { + if i == j { + continue + } + + require.NotSamef(t, p1, p2, "processors must not be re-used") + } + } + }) +} + +type clientMock struct { + cfg beat.ClientConfig +} + +func (clientMock) Publish(beat.Event) {} +func (clientMock) PublishAll([]beat.Event) {} +func (clientMock) Close() error { return nil } + +type pipelineConnectorMock struct{} + +func (pipelineConnectorMock) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) { + client := &clientMock{ + cfg: cfg, + } + return client, nil +} + +func (pipelineConnectorMock) Connect() (beat.Client, error) { + return &clientMock{}, nil +} diff --git a/filebeat/channel/runner_test.go b/filebeat/channel/runner_test.go index cf42a38e4b8e..f683c3863d32 100644 --- a/filebeat/channel/runner_test.go +++ b/filebeat/channel/runner_test.go @@ -30,6 +30,13 @@ import ( "github.com/elastic/beats/v7/libbeat/common" "github.com/elastic/beats/v7/libbeat/processors" "github.com/elastic/beats/v7/libbeat/processors/actions" +<<<<<<< HEAD +======= + _ "github.com/elastic/beats/v7/libbeat/processors/add_cloud_metadata" + _ "github.com/elastic/beats/v7/libbeat/processors/add_kubernetes_metadata" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/mapstr" +>>>>>>> 3d917c8519 (Add test for the processor re-use issue (#34870)) ) func TestProcessorsForConfig(t *testing.T) { @@ -210,3 +217,39 @@ func makeProcessors(procs ...processors.Processor) *processors.Processors { procList.List = procs return procList } + +func TestRunnerFactoryWithCommonInputSettings(t *testing.T) { + + // we use `add_kubernetes_metadata` and `add_cloud_metadata` + // for testing because initially the problem we've discovered + // was visible with these 2 processors. + configYAML := ` +processors: + - add_kubernetes_metadata: ~ + - add_cloud_metadata: ~ +keep_null: true +publisher_pipeline: + disable_host: true +type: "filestream" +service.type: "module" +pipeline: "test" +index: "%{[fields.log_type]}-%{[agent.version]}-%{+yyyy.MM.dd}" +` + cfg, err := conf.NewConfigWithYAML([]byte(configYAML), configYAML) + require.NoError(t, err) + + b := beat.Info{} // not important for the test + rf := &runnerFactoryMock{ + clientCount: 3, // we will create 3 clients from the wrapped pipeline + } + pcm := &pipelineConnectorMock{} // creates mock pipeline clients and will get wrapped + + rfwc := RunnerFactoryWithCommonInputSettings(b, rf) + + // create a wrapped runner, our mock runner will + // create the given amount of clients here using the wrapped pipeline connector. + _, err = rfwc.Create(pcm, cfg) + require.NoError(t, err) + + rf.Assert(t) +}