diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index c87e8d1e956c..2b8b161430b3 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -150,6 +150,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Allow http_endpoint instances to share ports. {issue}32578[32578] {pull}33377[33377] - Improve httpjson documentation for split processor. {pull}33473[33473] - Added separation of transform context object inside httpjson. Introduced new clause `.parent_last_response.*` {pull}33499[33499] +- Cloud Foundry input uses server-side filtering when retrieving logs. {pull}33456[33456] *Auditbeat* diff --git a/x-pack/filebeat/input/cloudfoundry/v1.go b/x-pack/filebeat/input/cloudfoundry/v1.go index 6b0b5077b135..b80fba36b1bb 100644 --- a/x-pack/filebeat/input/cloudfoundry/v1.go +++ b/x-pack/filebeat/input/cloudfoundry/v1.go @@ -8,7 +8,7 @@ package cloudfoundry import ( - "github.com/pkg/errors" + "fmt" v2 "github.com/elastic/beats/v7/filebeat/input/v2" stateless "github.com/elastic/beats/v7/filebeat/input/v2/input-stateless" @@ -51,7 +51,7 @@ func (i *inputV1) Run(ctx v2.Context, publisher stateless.Publisher) error { consumer, err := hub.DopplerConsumer(callbacks) if err != nil { - return errors.Wrapf(err, "initializing doppler consumer") + return fmt.Errorf("initializing doppler consumer: %w", err) } stopCtx, cancel := ctxtool.WithFunc(ctx.Cancelation, func() { diff --git a/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go b/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go index a2ec852b66c9..f33a2f1f7460 100644 --- a/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go +++ b/x-pack/libbeat/common/cloudfoundry/dopplerconsumer.go @@ -80,27 +80,15 @@ func (c *DopplerConsumer) Run() { } func (c *DopplerConsumer) logsFirehose() { - c.firehose(c.callbacks.Log, consumer.LogMessages) + c.firehose(c.callbacks.Log, filterLogs, consumer.LogMessages) } func (c *DopplerConsumer) metricsFirehose() { - c.firehose(c.callbacks.Metric, consumer.Metrics) -} - -func (c *DopplerConsumer) firehose(cb func(evt Event), filter consumer.EnvelopeFilter) { - var msgChan <-chan *events.Envelope - var errChan <-chan error - filterFn := filterNoFilter - if filter == consumer.LogMessages { - // We are interested in more envelopes than the ones obtained when filtering - // by log messages, retrieve them all and filter later. - // If this causes performance or other problems, we will have to investigate - // if it is possible to pass different filters to the firehose url. - filterFn = filterLogs - msgChan, errChan = c.consumer.Firehose(c.subscriptionID, "") - } else { - msgChan, errChan = c.consumer.FilteredFirehose(c.subscriptionID, "", filter) - } + c.firehose(c.callbacks.Metric, filterNoFilter, consumer.Metrics) +} + +func (c *DopplerConsumer) firehose(cb func(evt Event), filterFn func(*events.Envelope) bool, filter consumer.EnvelopeFilter) { + msgChan, errChan := c.consumer.FilteredFirehose(c.subscriptionID, "", filter) for { select { case env := <-msgChan: