From 31adbf407182227c73b003d5793df9bea193fefc Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Mon, 4 Dec 2023 19:52:49 -0600 Subject: [PATCH 1/6] filebeat benchmark input simple input that generates synthetic events, useful for benchmarking outputs --- filebeat/docs/filebeat-options.asciidoc | 3 + .../docs/inputs/input-benchmark.asciidoc | 93 +++++++++ x-pack/filebeat/input/benchmark/config.go | 31 +++ .../filebeat/input/benchmark/config_test.go | 33 ++++ x-pack/filebeat/input/benchmark/input.go | 177 ++++++++++++++++++ .../input/default-inputs/inputs_other.go | 2 + 6 files changed, 339 insertions(+) create mode 100644 x-pack/filebeat/docs/inputs/input-benchmark.asciidoc create mode 100644 x-pack/filebeat/input/benchmark/config.go create mode 100644 x-pack/filebeat/input/benchmark/config_test.go create mode 100644 x-pack/filebeat/input/benchmark/input.go diff --git a/filebeat/docs/filebeat-options.asciidoc b/filebeat/docs/filebeat-options.asciidoc index 1e9f9cac6e06..13e4ffde4993 100644 --- a/filebeat/docs/filebeat-options.asciidoc +++ b/filebeat/docs/filebeat-options.asciidoc @@ -70,6 +70,7 @@ You can configure {beatname_uc} to use the following inputs: * <<{beatname_lc}-input-aws-s3>> * <<{beatname_lc}-input-azure-eventhub>> * <<{beatname_lc}-input-azure-blob-storage>> +* <<{beatname_lc}-input-benchmark>> * <<{beatname_lc}-input-cel>> * <<{beatname_lc}-input-cloudfoundry>> * <<{beatname_lc}-input-cometd>> @@ -104,6 +105,8 @@ include::../../x-pack/filebeat/docs/inputs/input-azure-eventhub.asciidoc[] include::../../x-pack/filebeat/docs/inputs/input-azure-blob-storage.asciidoc[] +include::../../x-pack/filebeat/docs/inputs/input-benchmark.asciidoc[] + include::../../x-pack/filebeat/docs/inputs/input-cel.asciidoc[] include::../../x-pack/filebeat/docs/inputs/input-cloudfoundry.asciidoc[] diff --git a/x-pack/filebeat/docs/inputs/input-benchmark.asciidoc b/x-pack/filebeat/docs/inputs/input-benchmark.asciidoc new file mode 100644 index 000000000000..dffa47013fdc --- /dev/null +++ b/x-pack/filebeat/docs/inputs/input-benchmark.asciidoc @@ -0,0 +1,93 @@ +[role="xpack"] + +:type: benchmark + +[id="{beatname_lc}-input-{type}"] +=== Benchmark input + +++++ +generic event generator +++++ + +beta[] + +The benchmark input generates generic events and sends them to the output. This can be useful when you want to benchmark the difference between outputs or output settings. + +Example configurations: + +Basic example, infinite events as quickly as possible: +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: benchmark + enabled: true + message: "test message" + threads: 1 +---- + +Send 1024 events and stop example: +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: benchmark + enabled: true + message: "test message" + threads: 1 + count: 1024 +---- + +Send 5 events per second example: +["source","yaml",subs="attributes"] +---- +{beatname_lc}.inputs: +- type: benchmark + enabled: true + message: "test message" + threads: 1 + eps: 5 +---- + +==== Configuration options + +The `benchmark` input supports the following configuration options plus the +<<{beatname_lc}-input-{type}-common-options>> described later. + +[float] +==== `message` + +This is the value that will be in the `message` field of the json document. + +[float] +==== `threads` + +This is the number of go routines that will be started generating messages. Normally 1 thread can saturate an output but if necessary this can be increased. + +[float] +==== `count` + +This is the number of messages to send. 0 represents sending infinite messages. This is mutually exclusive with the `eps` option. + +[float] +==== `eps` + +This is the number of events per second to send. 0 represents sending as quickly as possible. This is mutually exclusive with the `count` option. + + +[float] +=== Metrics + +This input exposes metrics under the <>. +These metrics are exposed under the `/inputs` path. They can be used to +observe the activity of the input. + +[options="header"] +|======= +| Metric | Description +| `events_published_total` | Number of events published. +| `publishing_time` | Histogram of the elapsed in nanoseconds (time of publisher.Publish). +|======= + +[id="{beatname_lc}-input-{type}-common-options"] +include::../../../../filebeat/docs/inputs/input-common-options.asciidoc[] + +:type!: diff --git a/x-pack/filebeat/input/benchmark/config.go b/x-pack/filebeat/input/benchmark/config.go new file mode 100644 index 000000000000..e26182476a95 --- /dev/null +++ b/x-pack/filebeat/input/benchmark/config.go @@ -0,0 +1,31 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package benchmark + +import "fmt" + +type benchmarkConfig struct { + Message string `config:"message"` + Count uint64 `config:"count"` + Threads uint8 `config:"threads"` + Eps uint64 `config:"eps"` +} + +var ( + defaultConfig = benchmarkConfig{ + Message: "generic benchmark message", + Threads: 1, + } +) + +func (c *benchmarkConfig) Validate() error { + if c.Count > 0 && c.Eps > 0 { + return fmt.Errorf("only one of count or eps may be specified, not both") + } + if c.Message == "" { + return fmt.Errorf("message must be specified") + } + return nil +} diff --git a/x-pack/filebeat/input/benchmark/config_test.go b/x-pack/filebeat/input/benchmark/config_test.go new file mode 100644 index 000000000000..2bf5b2dede86 --- /dev/null +++ b/x-pack/filebeat/input/benchmark/config_test.go @@ -0,0 +1,33 @@ +package benchmark + +import ( + "strings" + "testing" +) + +func TestValidate(t *testing.T) { + tests := map[string]struct { + cfg benchmarkConfig + expectError bool + errorString string + }{ + "default": {cfg: defaultConfig}, + "countAndEps": {cfg: benchmarkConfig{Message: "a", Count: 1, Eps: 1}, expectError: true, errorString: "only one of count or eps may be specified"}, + "empty": {cfg: benchmarkConfig{}, expectError: true, errorString: "message must be specified"}, + } + + for name, tc := range tests { + t.Run(name, func(t *testing.T) { + err := tc.cfg.Validate() + if err == nil && tc.expectError == true { + t.Fatalf("expected validation error, didn't get it") + } + if err != nil && tc.expectError == false { + t.Fatalf("unexpected validation error: %s", err) + } + if err != nil && !strings.Contains(err.Error(), tc.errorString) { + t.Fatalf("error: '%s' didn't contain expected string: '%s'", err, tc.errorString) + } + }) + } +} diff --git a/x-pack/filebeat/input/benchmark/input.go b/x-pack/filebeat/input/benchmark/input.go new file mode 100644 index 000000000000..69eb268945bf --- /dev/null +++ b/x-pack/filebeat/input/benchmark/input.go @@ -0,0 +1,177 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package benchmark + +import ( + "sync" + "time" + + "github.com/rcrowley/go-metrics" + + v2 "github.com/elastic/beats/v7/filebeat/input/v2" + stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/feature" + "github.com/elastic/beats/v7/libbeat/monitoring/inputmon" + "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/monitoring" + "github.com/elastic/elastic-agent-libs/monitoring/adapter" +) + +const ( + inputName = "benchmark" +) + +// Plugin registers the input +func Plugin() v2.Plugin { + return v2.Plugin{ + Name: inputName, + Stability: feature.Experimental, + Manager: stateless.NewInputManager(configure), + } +} + +func configure(cfg *config.C) (stateless.Input, error) { + bConf := defaultConfig + if err := cfg.Unpack(&bConf); err != nil { + return nil, err + } + return &benchmarkInput{cfg: bConf}, nil +} + +// benchmarkInput is the main runtime object for the input +type benchmarkInput struct { + cfg benchmarkConfig +} + +// Name returns the name of the input +func (bi *benchmarkInput) Name() string { + return inputName +} + +// Test validates the configuration +func (bi *benchmarkInput) Test(ctx v2.TestContext) error { + return bi.cfg.Validate() +} + +// Run starts the data generation. +func (bi *benchmarkInput) Run(ctx v2.Context, publisher stateless.Publisher) error { + var wg sync.WaitGroup + metrics := newInputMetrics(ctx.ID) + + for i := uint8(0); i < bi.cfg.Threads; i++ { + wg.Add(1) + go func(thread uint8) { + defer wg.Done() + runThread(ctx, publisher, thread, bi.cfg, metrics) + }(i) + } + wg.Wait() + return ctx.Cancelation.Err() +} + +func runThread(ctx v2.Context, publisher stateless.Publisher, thread uint8, cfg benchmarkConfig, metrics *inputMetrics) { + ctx.Logger.Infof("starting benchmark input thread: %d", thread) + defer ctx.Logger.Infof("stopping benchmark input thread: %d", thread) + + var line uint64 + var name uint64 + + switch { + case cfg.Count > 0: + for { + select { + case <-ctx.Cancelation.Done(): + return + default: + publishEvt(publisher, cfg.Message, line, name, thread, metrics) + line++ + if (line % cfg.Count) == 0 { + return + } + } + } + case cfg.Eps > 0: + ticker := time.NewTicker(1 * time.Second) + pubChan := make(chan bool, int(cfg.Eps)) + for { + select { + case <-ctx.Cancelation.Done(): + ticker.Stop() + return + case <-ticker.C: + //don't want to block on filling doPublish channel + //so only send as many as it can hold right now + numToSend := cap(pubChan) - len(pubChan) + for i := 0; i < numToSend; i++ { + pubChan <- true + } + case <-pubChan: + publishEvt(publisher, cfg.Message, line, name, thread, metrics) + line++ + if line == 0 { + name++ + } + } + } + default: + for { + select { + case <-ctx.Cancelation.Done(): + return + default: + publishEvt(publisher, cfg.Message, line, name, thread, metrics) + line++ + if line == 0 { + name++ + } + } + } + } + return +} + +func publishEvt(publisher stateless.Publisher, msg string, line uint64, filename uint64, thread uint8, metrics *inputMetrics) { + timestamp := time.Now() + evt := beat.Event{ + Timestamp: timestamp, + Fields: mapstr.M{ + "message": msg, + "line": line, + "filename": filename, + "thread": thread, + }, + } + publisher.Publish(evt) + metrics.publishingTime.Update(time.Since(timestamp).Nanoseconds()) + metrics.eventsPublished.Add(1) +} + +type inputMetrics struct { + unregister func() + + eventsPublished *monitoring.Uint // number of events published + publishingTime metrics.Sample // histogram of the elapsed times in nanoseconds (time of publisher.Publish) +} + +// newInputMetrics returns an input metric for the benchmark processor. +func newInputMetrics(id string) *inputMetrics { + reg, unreg := inputmon.NewInputRegistry(inputName, id, nil) + out := &inputMetrics{ + unregister: unreg, + eventsPublished: monitoring.NewUint(reg, "events_published_total"), + publishingTime: metrics.NewUniformSample(1024), + } + + _ = adapter.NewGoMetrics(reg, "publishing_time", adapter.Accept). + Register("histogram", metrics.NewHistogram(out.publishingTime)) + + return out +} + +func (m *inputMetrics) Close() { + m.unregister() +} diff --git a/x-pack/filebeat/input/default-inputs/inputs_other.go b/x-pack/filebeat/input/default-inputs/inputs_other.go index ab682e4e0010..e53538fbcef8 100644 --- a/x-pack/filebeat/input/default-inputs/inputs_other.go +++ b/x-pack/filebeat/input/default-inputs/inputs_other.go @@ -13,6 +13,7 @@ import ( "github.com/elastic/beats/v7/x-pack/filebeat/input/awscloudwatch" "github.com/elastic/beats/v7/x-pack/filebeat/input/awss3" "github.com/elastic/beats/v7/x-pack/filebeat/input/azureblobstorage" + "github.com/elastic/beats/v7/x-pack/filebeat/input/benchmark" "github.com/elastic/beats/v7/x-pack/filebeat/input/cel" "github.com/elastic/beats/v7/x-pack/filebeat/input/cloudfoundry" "github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics" @@ -45,5 +46,6 @@ func xpackInputs(info beat.Info, log *logp.Logger, store beater.StateStore) []v2 shipper.Plugin(log, store), websocket.Plugin(log, store), netflow.Plugin(log), + benchmark.Plugin(), } } From 7c1b05d81e2a22b45c52944611feaa47a44ca691 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Wed, 13 Dec 2023 13:41:37 -0600 Subject: [PATCH 2/6] add changelog --- CHANGELOG.next.asciidoc | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 37fc00bd9262..f82b536b61ae 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -203,6 +203,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Update CEL mito extensions to v1.10.0 to add keys/values helper. {pull}38504[38504] - Add support for Active Directory an entity analytics provider. {pull}37919[37919] - Add debugging breadcrumb to logs when writing request trace log. {pull}38636[38636] +- added benchmark input {pull}37437[37437] *Auditbeat* From d3bb2149054435b29c8602c4be87e9688c7acf87 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Wed, 13 Dec 2023 14:04:25 -0600 Subject: [PATCH 3/6] fix mage check errors --- x-pack/filebeat/input/benchmark/config_test.go | 4 ++++ x-pack/filebeat/input/benchmark/input.go | 1 - 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/x-pack/filebeat/input/benchmark/config_test.go b/x-pack/filebeat/input/benchmark/config_test.go index 2bf5b2dede86..0481485d7e83 100644 --- a/x-pack/filebeat/input/benchmark/config_test.go +++ b/x-pack/filebeat/input/benchmark/config_test.go @@ -1,3 +1,7 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + package benchmark import ( diff --git a/x-pack/filebeat/input/benchmark/input.go b/x-pack/filebeat/input/benchmark/input.go index 69eb268945bf..832fb4d87087 100644 --- a/x-pack/filebeat/input/benchmark/input.go +++ b/x-pack/filebeat/input/benchmark/input.go @@ -131,7 +131,6 @@ func runThread(ctx v2.Context, publisher stateless.Publisher, thread uint8, cfg } } } - return } func publishEvt(publisher stateless.Publisher, msg string, line uint64, filename uint64, thread uint8, metrics *inputMetrics) { From 729f0e3d1c87ab8b55773249be69521546ae6348 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Fri, 22 Dec 2023 19:59:23 -0600 Subject: [PATCH 4/6] add a discard output --- CHANGELOG.next.asciidoc | 1 + libbeat/docs/outputs-list.asciidoc | 10 +++ libbeat/outputs/discard/config.go | 30 +++++++ libbeat/outputs/discard/discard.go | 82 +++++++++++++++++++ libbeat/outputs/discard/docs/discard.asciidoc | 34 ++++++++ libbeat/publisher/includes/includes.go | 1 + .../docs/inputs/input-benchmark.asciidoc | 6 +- 7 files changed, 161 insertions(+), 3 deletions(-) create mode 100644 libbeat/outputs/discard/config.go create mode 100644 libbeat/outputs/discard/discard.go create mode 100644 libbeat/outputs/discard/docs/discard.asciidoc diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index f82b536b61ae..8dd559e18450 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -204,6 +204,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Add support for Active Directory an entity analytics provider. {pull}37919[37919] - Add debugging breadcrumb to logs when writing request trace log. {pull}38636[38636] - added benchmark input {pull}37437[37437] +- added benchmark input and discard output {pull}37437[37437] *Auditbeat* diff --git a/libbeat/docs/outputs-list.asciidoc b/libbeat/docs/outputs-list.asciidoc index 4181c10f64f6..bf6bda35094b 100644 --- a/libbeat/docs/outputs-list.asciidoc +++ b/libbeat/docs/outputs-list.asciidoc @@ -24,6 +24,9 @@ endif::[] ifndef::no_console_output[] * <> endif::[] +ifndef::no_discard_output[] +* <> +endif::[] //# end::outputs-list[] @@ -77,6 +80,13 @@ endif::[] include::{libbeat-outputs-dir}/console/docs/console.asciidoc[] endif::[] +ifndef::no_discard_output[] +ifdef::requires_xpack[] +[role="xpack"] +endif::[] +include::{libbeat-outputs-dir}/discard/docs/discard.asciidoc[] +endif::[] + ifndef::no_codec[] ifdef::requires_xpack[] [role="xpack"] diff --git a/libbeat/outputs/discard/config.go b/libbeat/outputs/discard/config.go new file mode 100644 index 000000000000..ffdb6c038b37 --- /dev/null +++ b/libbeat/outputs/discard/config.go @@ -0,0 +1,30 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package discard + +import ( + "github.com/elastic/elastic-agent-libs/config" +) + +type discardOutConfig struct { + Queue config.Namespace `config:"queue"` +} + +func defaultConfig() discardOutConfig { + return discardOutConfig{} +} diff --git a/libbeat/outputs/discard/discard.go b/libbeat/outputs/discard/discard.go new file mode 100644 index 000000000000..e348c2eb1b11 --- /dev/null +++ b/libbeat/outputs/discard/discard.go @@ -0,0 +1,82 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package discard + +import ( + "context" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/outputs" + "github.com/elastic/beats/v7/libbeat/publisher" + "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" +) + +func init() { + outputs.RegisterType("discard", makeDiscard) +} + +type discardOutput struct { + log *logp.Logger + beat beat.Info + observer outputs.Observer +} + +func makeDiscard( + _ outputs.IndexManager, + beat beat.Info, + observer outputs.Observer, + cfg *config.C, +) (outputs.Group, error) { + out := &discardOutput{ + log: logp.NewLogger("discard"), + beat: beat, + observer: observer, + } + doConfig := defaultConfig() + if err := cfg.Unpack(&doConfig); err != nil { + return outputs.Fail(err) + } + + // disable bulk support in publisher pipeline + _ = cfg.SetInt("bulk_max_size", -1, -1) + out.log.Infof("Initialized discard output") + return outputs.Success(doConfig.Queue, -1, 0, out) +} + +// Implement Outputer +func (out *discardOutput) Close() error { + return nil +} + +func (out *discardOutput) Publish(_ context.Context, batch publisher.Batch) error { + defer batch.ACK() + + st := out.observer + events := batch.Events() + st.NewBatch(len(events)) + for range events { + st.WriteError(nil) + } + st.Acked(len(events)) + return nil +} + +func (out *discardOutput) String() string { + return "discard" +} diff --git a/libbeat/outputs/discard/docs/discard.asciidoc b/libbeat/outputs/discard/docs/discard.asciidoc new file mode 100644 index 000000000000..b17ca18761dd --- /dev/null +++ b/libbeat/outputs/discard/docs/discard.asciidoc @@ -0,0 +1,34 @@ +[[discard-output]] +=== Configure the Discard output + +++++ +Discard +++++ + +The Discard output throws away data. + +WARNING: The Discard output should be used only for development or +debugging issues. Data is lost. + +This can be useful if you want to work on your input configuration +without needing to configure an output. It can also be useful to test +how changes in input and processor configuration affect performance. + +Example configuration: + +["source","yaml",subs="attributes"] +------------------------------------------------------------------------------ +output.discard: + enabled: true +------------------------------------------------------------------------------ + +==== Configuration options + +You can specify the following `output.discard` options in the +{beatname_lc}.yml+ config file: + +===== `enabled` + +The enabled config is a boolean setting to enable or disable the output. If set +to false, the output is disabled. + +The default value is `true`. diff --git a/libbeat/publisher/includes/includes.go b/libbeat/publisher/includes/includes.go index 84622ad10f1a..c1e2d02e3cfd 100644 --- a/libbeat/publisher/includes/includes.go +++ b/libbeat/publisher/includes/includes.go @@ -22,6 +22,7 @@ import ( _ "github.com/elastic/beats/v7/libbeat/outputs/codec/format" _ "github.com/elastic/beats/v7/libbeat/outputs/codec/json" _ "github.com/elastic/beats/v7/libbeat/outputs/console" + _ "github.com/elastic/beats/v7/libbeat/outputs/discard" _ "github.com/elastic/beats/v7/libbeat/outputs/elasticsearch" _ "github.com/elastic/beats/v7/libbeat/outputs/fileout" _ "github.com/elastic/beats/v7/libbeat/outputs/kafka" diff --git a/x-pack/filebeat/docs/inputs/input-benchmark.asciidoc b/x-pack/filebeat/docs/inputs/input-benchmark.asciidoc index dffa47013fdc..aeab32e6f4a7 100644 --- a/x-pack/filebeat/docs/inputs/input-benchmark.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-benchmark.asciidoc @@ -6,12 +6,12 @@ === Benchmark input ++++ -generic event generator +Benchmark ++++ beta[] -The benchmark input generates generic events and sends them to the output. This can be useful when you want to benchmark the difference between outputs or output settings. +The Benchmark input generates generic events and sends them to the output. This can be useful when you want to benchmark the difference between outputs or output settings. Example configurations: @@ -49,7 +49,7 @@ Send 5 events per second example: ==== Configuration options -The `benchmark` input supports the following configuration options plus the +The Benchmark input supports the following configuration options plus the <<{beatname_lc}-input-{type}-common-options>> described later. [float] From e0bb275085f1d70d422069dea8cdaa7c873db43e Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Tue, 12 Mar 2024 18:34:59 -0500 Subject: [PATCH 5/6] fixes based on review --- libbeat/outputs/discard/discard.go | 3 --- libbeat/outputs/discard/docs/discard.asciidoc | 4 ++-- x-pack/filebeat/docs/inputs/input-benchmark.asciidoc | 8 ++++---- x-pack/filebeat/input/benchmark/input.go | 2 +- 4 files changed, 7 insertions(+), 10 deletions(-) diff --git a/libbeat/outputs/discard/discard.go b/libbeat/outputs/discard/discard.go index e348c2eb1b11..a33462f5c2a7 100644 --- a/libbeat/outputs/discard/discard.go +++ b/libbeat/outputs/discard/discard.go @@ -70,9 +70,6 @@ func (out *discardOutput) Publish(_ context.Context, batch publisher.Batch) erro st := out.observer events := batch.Events() st.NewBatch(len(events)) - for range events { - st.WriteError(nil) - } st.Acked(len(events)) return nil } diff --git a/libbeat/outputs/discard/docs/discard.asciidoc b/libbeat/outputs/discard/docs/discard.asciidoc index b17ca18761dd..3e2990cb93ba 100644 --- a/libbeat/outputs/discard/docs/discard.asciidoc +++ b/libbeat/outputs/discard/docs/discard.asciidoc @@ -8,10 +8,10 @@ The Discard output throws away data. WARNING: The Discard output should be used only for development or -debugging issues. Data is lost. +debugging issues. Data is lost. This can be useful if you want to work on your input configuration -without needing to configure an output. It can also be useful to test +without needing to configure an output. It can also be useful to test how changes in input and processor configuration affect performance. Example configuration: diff --git a/x-pack/filebeat/docs/inputs/input-benchmark.asciidoc b/x-pack/filebeat/docs/inputs/input-benchmark.asciidoc index aeab32e6f4a7..db8036973357 100644 --- a/x-pack/filebeat/docs/inputs/input-benchmark.asciidoc +++ b/x-pack/filebeat/docs/inputs/input-benchmark.asciidoc @@ -11,7 +11,7 @@ beta[] -The Benchmark input generates generic events and sends them to the output. This can be useful when you want to benchmark the difference between outputs or output settings. +The Benchmark input generates generic events and sends them to the output. This can be useful when you want to benchmark the difference between outputs or output settings. Example configurations: @@ -60,17 +60,17 @@ This is the value that will be in the `message` field of the json document. [float] ==== `threads` -This is the number of go routines that will be started generating messages. Normally 1 thread can saturate an output but if necessary this can be increased. +This is the number of goroutines that will be started generating messages. Normally 1 thread can saturate an output but if necessary this can be increased. [float] ==== `count` -This is the number of messages to send. 0 represents sending infinite messages. This is mutually exclusive with the `eps` option. +This is the number of messages to send. 0 represents sending infinite messages. This is mutually exclusive with the `eps` option. [float] ==== `eps` -This is the number of events per second to send. 0 represents sending as quickly as possible. This is mutually exclusive with the `count` option. +This is the number of events per second to send. 0 represents sending as quickly as possible. This is mutually exclusive with the `count` option. [float] diff --git a/x-pack/filebeat/input/benchmark/input.go b/x-pack/filebeat/input/benchmark/input.go index 832fb4d87087..dd6d198cc409 100644 --- a/x-pack/filebeat/input/benchmark/input.go +++ b/x-pack/filebeat/input/benchmark/input.go @@ -89,7 +89,7 @@ func runThread(ctx v2.Context, publisher stateless.Publisher, thread uint8, cfg default: publishEvt(publisher, cfg.Message, line, name, thread, metrics) line++ - if (line % cfg.Count) == 0 { + if line == cfg.Count { return } } From 497c8c69f03199625eee771404c6acfb16c74ddf Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Wed, 10 Apr 2024 18:43:45 -0500 Subject: [PATCH 6/6] fix for encoder factory --- libbeat/outputs/discard/discard.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/libbeat/outputs/discard/discard.go b/libbeat/outputs/discard/discard.go index a33462f5c2a7..c9a51b0f33df 100644 --- a/libbeat/outputs/discard/discard.go +++ b/libbeat/outputs/discard/discard.go @@ -56,7 +56,7 @@ func makeDiscard( // disable bulk support in publisher pipeline _ = cfg.SetInt("bulk_max_size", -1, -1) out.log.Infof("Initialized discard output") - return outputs.Success(doConfig.Queue, -1, 0, out) + return outputs.Success(doConfig.Queue, -1, 0, nil, out) } // Implement Outputer