Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test for the processor re-use issue #34870

Merged
merged 3 commits into from
Mar 21, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion filebeat/channel/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (f *onCreateFactory) Create(pipeline beat.PipelineConnector, cfg *conf.C) (
// - *processors*: list of local processors to be added to the processing pipeline
// - *keep_null*: keep or remove 'null' from events to be published
// - *_module_name* (hidden setting): Add fields describing the module name
// - *_ fileset_name* (hiddrn setting):
// - *_ fileset_name* (hidden setting):
// - *pipeline*: Configure the ES Ingest Node pipeline name to be used for events from this input
// - *index*: Configure the index name for events to be collected from this input
// - *type*: implicit event type
Expand Down
118 changes: 118 additions & 0 deletions filebeat/channel/runner_mock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package channel

import (
"testing"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/cfgfile"

conf "github.com/elastic/elastic-agent-libs/config"

"github.com/stretchr/testify/require"
)

type runnerFactoryMock struct {
cfgfile.RunnerFactory
rdner marked this conversation as resolved.
Show resolved Hide resolved
clientCount int
cfgs []beat.ClientConfig
}

func (r *runnerFactoryMock) Create(p beat.PipelineConnector, config *conf.C) (cfgfile.Runner, error) {
// When using the connector multiple times to create a client
// it's using the same editor function for creating a new client
// with a modified configuration that includes predefined processing.
// This is why we must make sure nothing is re-used from one client to another.
for i := 0; i < r.clientCount; i++ {
client, err := p.ConnectWith(beat.ClientConfig{})
if err != nil {
return nil, err
}

// storing the config that the client was created with
// it's needed for the `Assert` later
r.cfgs = append(r.cfgs, client.(*clientMock).cfg)
}
return &struct {
cfgfile.Runner
}{}, nil
}

// Assert runs various checks for the clients created by the wrapped pipeline connector
// We check that the processing configuration does not reference the same addresses as before,
// re-using some parts of the processing configuration will result in various issues, such as:
// * closing processors multiple times
// * using closed processors
// * modifiying an object shared by multiple pipeline clients
func (r runnerFactoryMock) Assert(t *testing.T) {
t.Helper()

// we need to make sure `Assert` is called after `Create`
require.Len(t, r.cfgs, r.clientCount)

t.Run("new processing configuration each time", func(t *testing.T) {
for i, c1 := range r.cfgs {
for j, c2 := range r.cfgs {
if i == j {
continue
}

require.NotSamef(t, c1.Processing, c2.Processing, "processing configuration cannot be re-used")
require.NotSamef(t, c1.Processing.Meta, c2.Processing.Meta, "`Processing.Meta` cannot be re-used")
require.NotSamef(t, c1.Processing.Fields, c2.Processing.Fields, "`Processing.Fields` cannot be re-used")
require.NotSamef(t, c1.Processing.Processor, c2.Processing.Processor, "`Processing.Processor` cannot be re-used")
}
}
})

t.Run("new processors each time", func(t *testing.T) {
var processors []beat.Processor
for _, c := range r.cfgs {
processors = append(processors, c.Processing.Processor.All()...)
}

require.NotEmptyf(t, processors, "for this test the list of processors cannot be empty")

for i, p1 := range processors {
for j, p2 := range processors {
if i == j {
continue
}

require.NotSamef(t, p1, p2, "processors must not be re-used")
}
}
})
}

type clientMock struct {
beat.Client
rdner marked this conversation as resolved.
Show resolved Hide resolved
cfg beat.ClientConfig
}

type pipelineConnectorMock struct {
beat.Pipeline
rdner marked this conversation as resolved.
Show resolved Hide resolved
}

func (p *pipelineConnectorMock) ConnectWith(cfg beat.ClientConfig) (beat.Client, error) {
client := &clientMock{
cfg: cfg,
}
return client, nil
}
38 changes: 38 additions & 0 deletions filebeat/channel/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ import (
"github.com/elastic/beats/v7/libbeat/beat/events"
"github.com/elastic/beats/v7/libbeat/processors"
"github.com/elastic/beats/v7/libbeat/processors/actions"
_ "github.com/elastic/beats/v7/libbeat/processors/add_cloud_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/add_kubernetes_metadata"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/mapstr"
)
Expand Down Expand Up @@ -211,3 +213,39 @@ func makeProcessors(procs ...processors.Processor) *processors.Processors {
procList.List = procs
return procList
}

func TestRunnerFactoryWithCommonInputSettings(t *testing.T) {

// we use `add_kubernetes_metadata` and `add_cloud_metadata`
// for testing because initially the problem we've discovered
// was visible with these 2 processors.
configYAML := `
processors:
- add_kubernetes_metadata: ~
- add_cloud_metadata: ~
keep_null: true
publisher_pipeline:
disable_host: true
type: "filestream"
service.type: "module"
pipeline: "test"
index: "%{[fields.log_type]}-%{[agent.version]}-%{+yyyy.MM.dd}"
`
cfg, err := conf.NewConfigWithYAML([]byte(configYAML), configYAML)
require.NoError(t, err)

b := beat.Info{} // not important for the test
rf := &runnerFactoryMock{
clientCount: 3, // we will create 3 clients from the wrapped pipeline
}
pcm := &pipelineConnectorMock{} // creates mock pipeline clients and will get wrapped

rfwc := RunnerFactoryWithCommonInputSettings(b, rf)

// create a wrapped runner, our mock runner will
// create the given amount of clients here using the wrapped pipeline connector.
_, err = rfwc.Create(pcm, cfg)
require.NoError(t, err)

rf.Assert(t)
}