Skip to content

Commit

Permalink
[Filebeat][CometD] Resolve Retry Error Handling (#34327)
Browse files Browse the repository at this point in the history
* resolve retry error handling

* add retry in input worker

* added unit test for testing EOF error retry in cometd input

* resolve golangci-lint errors

* introduce SObject extraction as well

* improved logging

* salesforce force fully closing connection for the case of timeouts

* channel creation for each new iteration to avoid unnecessary channel close panics
  • Loading branch information
kush-elastic authored Feb 22, 2023
1 parent 94ee984 commit 47723fe
Show file tree
Hide file tree
Showing 3 changed files with 343 additions and 43 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fixing system tests not returning expected content encoding for azure blob storage input. {pull}34412[34412]
- [Azure Logs] Fix authentication_processing_details parsing in sign-in logs. {issue}34330[34330] {pull}34478[34478]
- Prevent Elasticsearch from spewing log warnings about redundant wildcard when setting up ingest pipelines. {issue}34249[34249] {pull}34550[34550]
- Fix the issue of `cometd` input worker getting closed in case of a network connection issue and an EOF error. {issue}34326[34326] {pull}34327[34327]

*Heartbeat*

Expand Down
96 changes: 73 additions & 23 deletions x-pack/filebeat/input/cometd/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ package cometd
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"sync"
"time"

"golang.org/x/time/rate"

"github.com/elastic/beats/v7/filebeat/channel"
"github.com/elastic/beats/v7/filebeat/input"
"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -23,6 +27,9 @@ import (

const (
inputName = "cometd"

// retryInterval is the minimum duration between pub/sub client retries.
retryInterval = 30 * time.Second
)

// Run starts the input worker then returns. Only the first invocation
Expand All @@ -37,44 +44,90 @@ func (in *cometdInput) Run() {
defer in.workerWg.Done()
defer in.workerCancel()
in.b = bay.Bayeux{}
in.creds, err = bay.GetSalesforceCredentials(in.authParams)
if err != nil {
in.log.Errorw("not able to get access token", "error", err)
return
}
if err := in.run(); err != nil {
in.log.Errorw("got error while running input", "error", err)
return

rt := rate.NewLimiter(rate.Every(retryInterval), 1)

for in.workerCtx.Err() == nil {
// Rate limit.
if err := rt.Wait(in.workerCtx); err != nil {
continue
}

// Creating a new channel for cometd input.
in.msgCh = make(chan bay.MaybeMsg, 1)

in.creds, err = bay.GetSalesforceCredentials(in.authParams)
if err != nil {
in.log.Errorw("not able to get access token", "error", err)
continue
}

if err := in.run(); err != nil {
if in.workerCtx.Err() == nil {
in.log.Errorw("Restarting failed CometD input worker.", "error", err)
continue
}

// Log any non-cancellation error before stopping.
if !errors.Is(err, context.Canceled) {
in.log.Errorw("got error while running input", "error", err)
}
}
}
}()
})
}

func (in *cometdInput) run() error {
in.msgCh = in.b.Channel(in.workerCtx, in.msgCh, "-1", *in.creds, in.config.ChannelName)
ctx, cancel := context.WithCancel(in.workerCtx)
defer cancel()
// Ticker with 5 seconds to avoid log too many warnings
ticker := time.NewTicker(5 * time.Second)
in.msgCh = in.b.Channel(ctx, in.msgCh, "-1", *in.creds, in.config.ChannelName)
for e := range in.msgCh {
if e.Failed() {
return fmt.Errorf("error collecting events: %w", e.Err)
// if err bayeux library returns recoverable error, do not close input.
// instead continue with connection warning
if !strings.Contains(e.Error(), "trying again") {
return fmt.Errorf("error collecting events: %w", e.Err)
}
// log warning every 5 seconds only to avoid to many unnecessary logs
select {
case <-ticker.C:
in.log.Errorw("Retrying...! facing issue while collecting data from CometD", "error", e.Error())
default:
}
} else if !e.Msg.Successful {
var event event
// To handle the last response where the object received was empty
if e.Msg.Data.Payload == nil {
return nil
}

var msg []byte
var err error
// Convert json.RawMessage response to []byte
msg, err := e.Msg.Data.Payload.MarshalJSON()
if err != nil {
return fmt.Errorf("JSON error: %w", err)
if e.Msg.Data.Payload != nil {
msg, err = e.Msg.Data.Payload.MarshalJSON()
if err != nil {
in.log.Errorw("invalid JSON", "error", err)
continue
}
} else if e.Msg.Data.Object != nil {
msg, err = e.Msg.Data.Object.MarshalJSON()
if err != nil {
in.log.Errorw("invalid JSON", "error", err)
continue
}
} else {
// To handle the last response where the object received was empty
return nil
}

// Extract event IDs from json.RawMessage
err = json.Unmarshal(e.Msg.Data.Payload, &event)
err = json.Unmarshal(msg, &event)
if err != nil {
return fmt.Errorf("error while parsing JSON: %w", err)
in.log.Errorw("error while parsing JSON", "error", err)
continue
}
if ok := in.outlet.OnEvent(makeEvent(event.EventId, e.Msg.Channel, string(msg))); !ok {
in.log.Debug("OnEvent returned false. Stopping input worker.")
cancel()
return fmt.Errorf("error ingesting data to elasticsearch")
}
}
Expand Down Expand Up @@ -133,9 +186,6 @@ func NewInput(
authParams: authParams,
}

// Creating a new channel for cometd input.
in.msgCh = make(chan bay.MaybeMsg, 1)

// Build outlet for events.
in.outlet, err = connector.Connect(cfg)
if err != nil {
Expand Down
Loading

0 comments on commit 47723fe

Please sign in to comment.