From 3b042fabd450b8763f96c82def8d3b49d2783901 Mon Sep 17 00:00:00 2001 From: ShourieG Date: Mon, 16 Dec 2024 14:45:13 +0530 Subject: [PATCH] [filebeat][streaming] - Fix for streaming input handling of invalid or empty websocket messages (#42036) * Fix for streaming input handling of invalid or empty websocket messages (cherry picked from commit d508a408b0e9f8a6ea0918186e8facc5e78a0b3e) --- CHANGELOG.next.asciidoc | 19 ++++++++++++ x-pack/filebeat/input/streaming/websocket.go | 32 ++++++++++---------- 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 42499bc997b7..370452ff965b 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -114,6 +114,25 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Fix publication of group data from the Okta entity analytics provider. {pull}40681[40681] - Ensure netflow custom field configuration is applied. {issue}40735[40735] {pull}40730[40730] - Fix a bug in Salesforce input to only handle responses with 200 status code {pull}41015[41015] +- Fixed failed job handling and removed false-positive error logs in the GCS input. {pull}41142[41142] +- Bump github.com/elastic/go-sfdc dependency used by x-pack/filebeat/input/salesforce. {pull}41192[41192] +- Log bad handshake details when websocket connection fails {pull}41300[41300] +- Improve modification time handling for entities and entity deletion logic in the Active Directory entityanalytics input. {pull}41179[41179] +- Journald input now can read events from all boots {issue}41083[41083] {pull}41244[41244] +- Fix double encoding of client_secret in the Entity Analytics input's Azure Active Directory provider {pull}41393[41393] +- Fix aws region in aws-s3 input s3 polling mode. {pull}41572[41572] +- Fix errors in SQS host resolution in the `aws-s3` input when using custom (non-AWS) endpoints. {pull}41504[41504] +- Fix double encoding of client_secret in the Entity Analytics input's Azure Active Directory provider {pull}41393[41393] +- The azure-eventhub input now correctly reports its status to the Elastic Agent on fatal errors {pull}41469[41469] +- Add support for Access Points in the `aws-s3` input. {pull}41495[41495] +- Fix the "No such input type exist: 'salesforce'" error on the Windows/AIX platform. {pull}41664[41664] +- Fix missing key in streaming input logging. {pull}41600[41600] +- Improve S3 object size metric calculation to support situations where Content-Length is not available. {pull}41755[41755] +- Fix handling of http_endpoint request exceeding memory limits. {issue}41764[41764] {pull}41765[41765] +- Rate limiting fixes in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41583[41583] +- Redact authorization headers in HTTPJSON debug logs. {pull}41920[41920] +- Further rate limiting fix in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977] +- Fix streaming input handling of invalid or empty websocket messages. {pull}42036[42036] *Heartbeat* diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index a8fbd4c664c9..97cfaec8d777 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -118,25 +118,25 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { _, message, err := c.ReadMessage() if err != nil { s.metrics.errorsTotal.Inc() - if isRetryableError(err) { - s.log.Debugw("websocket connection encountered an error, attempting to reconnect...", "error", err) - // close the old connection and reconnect - if err := c.Close(); err != nil { - s.metrics.errorsTotal.Inc() - s.log.Errorw("encountered an error while closing the websocket connection", "error", err) - } - // since c is already a pointer, we can reassign it to the new connection and the defer func will still handle it - c, resp, err = connectWebSocket(ctx, s.cfg, url, s.log) - handleConnectionResponse(resp, s.metrics, s.log) - if err != nil { - s.metrics.errorsTotal.Inc() - s.log.Errorw("failed to reconnect websocket connection", "error", err) - return err - } - } else { + if !isRetryableError(err) { s.log.Errorw("failed to read websocket data", "error", err) return err } + s.log.Debugw("websocket connection encountered an error, attempting to reconnect...", "error", err) + // close the old connection and reconnect + if err := c.Close(); err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("encountered an error while closing the websocket connection", "error", err) + } + // since c is already a pointer, we can reassign it to the new connection and the defer func will still handle it + c, resp, err = connectWebSocket(ctx, s.cfg, url, s.log) + handleConnectionResponse(resp, s.metrics, s.log) + if err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("failed to reconnect websocket connection", "error", err) + return err + } + continue } s.metrics.receivedBytesTotal.Add(uint64(len(message))) state["response"] = message