diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index bb255000f4e1..e83cd1db1863 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -124,6 +124,9 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - The azure-eventhub input now correctly reports its status to the Elastic Agent on fatal errors {pull}41469[41469] - Fix handling of http_endpoint request exceeding memory limits. {issue}41764[41764] {pull}41765[41765] - Rate limiting fixes in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41583[41583] +- Redact authorization headers in HTTPJSON debug logs. {pull}41920[41920] +- Further rate limiting fix in the Okta provider of the Entity Analytics input. {issue}40106[40106] {pull}41977[41977] +- Fix streaming input handling of invalid or empty websocket messages. {pull}42036[42036] *Heartbeat* diff --git a/x-pack/filebeat/input/streaming/websocket.go b/x-pack/filebeat/input/streaming/websocket.go index 8a8cf76fe361..225bc76e8d9f 100644 --- a/x-pack/filebeat/input/streaming/websocket.go +++ b/x-pack/filebeat/input/streaming/websocket.go @@ -118,25 +118,25 @@ func (s *websocketStream) FollowStream(ctx context.Context) error { _, message, err := c.ReadMessage() if err != nil { s.metrics.errorsTotal.Inc() - if isRetryableError(err) { - s.log.Debugw("websocket connection encountered an error, attempting to reconnect...", "error", err) - // close the old connection and reconnect - if err := c.Close(); err != nil { - s.metrics.errorsTotal.Inc() - s.log.Errorw("encountered an error while closing the websocket connection", "error", err) - } - // since c is already a pointer, we can reassign it to the new connection and the defer func will still handle it - c, resp, err = connectWebSocket(ctx, s.cfg, url, s.log) - handleConnectionResponse(resp, s.metrics, s.log) - if err != nil { - s.metrics.errorsTotal.Inc() - s.log.Errorw("failed to reconnect websocket connection", "error", err) - return err - } - } else { + if !isRetryableError(err) { s.log.Errorw("failed to read websocket data", "error", err) return err } + s.log.Debugw("websocket connection encountered an error, attempting to reconnect...", "error", err) + // close the old connection and reconnect + if err := c.Close(); err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("encountered an error while closing the websocket connection", "error", err) + } + // since c is already a pointer, we can reassign it to the new connection and the defer func will still handle it + c, resp, err = connectWebSocket(ctx, s.cfg, url, s.log) + handleConnectionResponse(resp, s.metrics, s.log) + if err != nil { + s.metrics.errorsTotal.Inc() + s.log.Errorw("failed to reconnect websocket connection", "error", err) + return err + } + continue } s.metrics.receivedBytesTotal.Add(uint64(len(message))) state["response"] = message