From 47723fe5e1ac50d43fe251e794b49dfb0ce159b8 Mon Sep 17 00:00:00 2001 From: Kush Rana <89848966+kush-elastic@users.noreply.github.com> Date: Wed, 22 Feb 2023 14:09:44 +0530 Subject: [PATCH] [Filebeat][CometD] Resolve Retry Error Handling (#34327) * resolve retry error handling * add retry in input worker * added unit test for testing EOF error retry in cometd input * resolve golangci-lint errors * introduce SObject extraction as well * improved logging * salesforce force fully closing connection for the case of timeouts * channel creation for each new iteration to avoid unnecessary channel close panics --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/cometd/input.go | 96 +++++-- x-pack/filebeat/input/cometd/input_test.go | 289 +++++++++++++++++++-- 3 files changed, 343 insertions(+), 43 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 067d50b7f026..3a85efa42fbf 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -92,6 +92,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff] - Fixing system tests not returning expected content encoding for azure blob storage input. {pull}34412[34412] - [Azure Logs] Fix authentication_processing_details parsing in sign-in logs. {issue}34330[34330] {pull}34478[34478] - Prevent Elasticsearch from spewing log warnings about redundant wildcard when setting up ingest pipelines. {issue}34249[34249] {pull}34550[34550] +- Fix the issue of `cometd` input worker getting closed in case of a network connection issue and an EOF error. {issue}34326[34326] {pull}34327[34327] *Heartbeat* diff --git a/x-pack/filebeat/input/cometd/input.go b/x-pack/filebeat/input/cometd/input.go index 026a1c234633..c451af818e9f 100644 --- a/x-pack/filebeat/input/cometd/input.go +++ b/x-pack/filebeat/input/cometd/input.go @@ -7,10 +7,14 @@ package cometd import ( "context" "encoding/json" + "errors" "fmt" + "strings" "sync" "time" + "golang.org/x/time/rate" + "github.com/elastic/beats/v7/filebeat/channel" "github.com/elastic/beats/v7/filebeat/input" "github.com/elastic/beats/v7/libbeat/beat" @@ -23,6 +27,9 @@ import ( const ( inputName = "cometd" + + // retryInterval is the minimum duration between pub/sub client retries. + retryInterval = 30 * time.Second ) // Run starts the input worker then returns. Only the first invocation @@ -37,44 +44,90 @@ func (in *cometdInput) Run() { defer in.workerWg.Done() defer in.workerCancel() in.b = bay.Bayeux{} - in.creds, err = bay.GetSalesforceCredentials(in.authParams) - if err != nil { - in.log.Errorw("not able to get access token", "error", err) - return - } - if err := in.run(); err != nil { - in.log.Errorw("got error while running input", "error", err) - return + + rt := rate.NewLimiter(rate.Every(retryInterval), 1) + + for in.workerCtx.Err() == nil { + // Rate limit. + if err := rt.Wait(in.workerCtx); err != nil { + continue + } + + // Creating a new channel for cometd input. + in.msgCh = make(chan bay.MaybeMsg, 1) + + in.creds, err = bay.GetSalesforceCredentials(in.authParams) + if err != nil { + in.log.Errorw("not able to get access token", "error", err) + continue + } + + if err := in.run(); err != nil { + if in.workerCtx.Err() == nil { + in.log.Errorw("Restarting failed CometD input worker.", "error", err) + continue + } + + // Log any non-cancellation error before stopping. + if !errors.Is(err, context.Canceled) { + in.log.Errorw("got error while running input", "error", err) + } + } } }() }) } func (in *cometdInput) run() error { - in.msgCh = in.b.Channel(in.workerCtx, in.msgCh, "-1", *in.creds, in.config.ChannelName) + ctx, cancel := context.WithCancel(in.workerCtx) + defer cancel() + // Ticker with 5 seconds to avoid log too many warnings + ticker := time.NewTicker(5 * time.Second) + in.msgCh = in.b.Channel(ctx, in.msgCh, "-1", *in.creds, in.config.ChannelName) for e := range in.msgCh { if e.Failed() { - return fmt.Errorf("error collecting events: %w", e.Err) + // if err bayeux library returns recoverable error, do not close input. + // instead continue with connection warning + if !strings.Contains(e.Error(), "trying again") { + return fmt.Errorf("error collecting events: %w", e.Err) + } + // log warning every 5 seconds only to avoid to many unnecessary logs + select { + case <-ticker.C: + in.log.Errorw("Retrying...! facing issue while collecting data from CometD", "error", e.Error()) + default: + } } else if !e.Msg.Successful { var event event - // To handle the last response where the object received was empty - if e.Msg.Data.Payload == nil { - return nil - } - + var msg []byte + var err error // Convert json.RawMessage response to []byte - msg, err := e.Msg.Data.Payload.MarshalJSON() - if err != nil { - return fmt.Errorf("JSON error: %w", err) + if e.Msg.Data.Payload != nil { + msg, err = e.Msg.Data.Payload.MarshalJSON() + if err != nil { + in.log.Errorw("invalid JSON", "error", err) + continue + } + } else if e.Msg.Data.Object != nil { + msg, err = e.Msg.Data.Object.MarshalJSON() + if err != nil { + in.log.Errorw("invalid JSON", "error", err) + continue + } + } else { + // To handle the last response where the object received was empty + return nil } // Extract event IDs from json.RawMessage - err = json.Unmarshal(e.Msg.Data.Payload, &event) + err = json.Unmarshal(msg, &event) if err != nil { - return fmt.Errorf("error while parsing JSON: %w", err) + in.log.Errorw("error while parsing JSON", "error", err) + continue } if ok := in.outlet.OnEvent(makeEvent(event.EventId, e.Msg.Channel, string(msg))); !ok { in.log.Debug("OnEvent returned false. Stopping input worker.") + cancel() return fmt.Errorf("error ingesting data to elasticsearch") } } @@ -133,9 +186,6 @@ func NewInput( authParams: authParams, } - // Creating a new channel for cometd input. - in.msgCh = make(chan bay.MaybeMsg, 1) - // Build outlet for events. in.outlet, err = connector.Connect(cfg) if err != nil { diff --git a/x-pack/filebeat/input/cometd/input_test.go b/x-pack/filebeat/input/cometd/input_test.go index 5c1cebdc7195..6f0814753a42 100644 --- a/x-pack/filebeat/input/cometd/input_test.go +++ b/x-pack/filebeat/input/cometd/input_test.go @@ -6,10 +6,13 @@ package cometd import ( "context" + "encoding/json" "fmt" - "io/ioutil" + "io" + "net" "net/http" "net/http/httptest" + "strings" "sync" "sync/atomic" "testing" @@ -270,7 +273,43 @@ func TestMultiInput(t *testing.T) { } // create Server - r := http.HandlerFunc(oauth2Handler) + r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "application/json") + _ = r.ParseForm() + if getTokenHandler(w, r) { + return + } + body, _ := io.ReadAll(r.Body) + data := getBayData(body) + + switch data.Channel { + case "/meta/handshake": + _, _ = w.Write([]byte(`[{"ext":{"replay":true,"payload.format":true},"minimumVersion":"1.0","clientId":"client_id","supportedConnectionTypes":["long-polling"],"channel":"/meta/handshake","version":"1.0","successful":true}]`)) + return + case "/meta/connect": + if called < uint64(expectedHTTPEventCount) { + if called == 0 { + atomic.AddUint64(&called, 1) + _, _ = w.Write([]byte(`[{"data": {"payload": {"CountryIso": "IN"}, "event": {"replayId":1234}}, "channel": "channel_name"}]`)) + return + } else if called == 1 { + atomic.AddUint64(&called, 1) + _, _ = w.Write([]byte(`[{"data": {"sobject": {"CountryIso": "IN"}, "event": {"replayId":1234}}, "channel": "channel_name"}]`)) + return + } + } + _, _ = w.Write([]byte(`{}`)) + return + case "/meta/subscribe": + if called == 0 { + _, _ = w.Write([]byte(`[{"clientId": "client_id", "channel": "/meta/subscribe", "subscription": "channel_name", "successful":true}]`)) + } else if called == 1 { + _, _ = w.Write([]byte(`[{"clientId": "client_id", "channel": "/meta/subscribe", "subscription": "channel_name1", "successful":true}]`)) + } + return + default: + } + }) server := httptest.NewServer(r) defer server.Close() serverURL = server.URL @@ -278,6 +317,7 @@ func TestMultiInput(t *testing.T) { // get common config cfg1 := conf.MustNewConfigFrom(config) + config["channel_name"] = "channel_name1" cfg2 := conf.MustNewConfigFrom(config) var inputContext finput.Context @@ -304,7 +344,9 @@ func TestMultiInput(t *testing.T) { defer input2.Stop() for _, event := range []beat.Event{<-eventsCh, <-eventsCh} { - assertEventMatches(t, expected1, event) + message, err := event.GetValue("message") + require.NoError(t, err) + require.Equal(t, string(expected1.Msg.Data.Payload), message) got++ } }() @@ -317,33 +359,25 @@ func TestMultiInput(t *testing.T) { func oauth2Handler(w http.ResponseWriter, r *http.Request) { w.Header().Set("content-type", "application/json") _ = r.ParseForm() - if r.URL.Path == "/token" { - response := `{"instance_url": "` + serverURL + `", "expires_in": "60", "access_token": "abcd"}` - _, _ = w.Write([]byte(response)) + if getTokenHandler(w, r) { return } - body, _ := ioutil.ReadAll(r.Body) + body, _ := io.ReadAll(r.Body) + data := getBayData(body) - switch string(body) { - case `{"channel": "/meta/handshake", "supportedConnectionTypes": ["long-polling"], "version": "1.0"}`: + switch data.Channel { + case "/meta/handshake": _, _ = w.Write([]byte(`[{"ext":{"replay":true,"payload.format":true},"minimumVersion":"1.0","clientId":"client_id","supportedConnectionTypes":["long-polling"],"channel":"/meta/handshake","version":"1.0","successful":true}]`)) return - case `{"channel": "/meta/connect", "connectionType": "long-polling", "clientId": "client_id"} `: + case "/meta/connect": if called < uint64(expectedHTTPEventCount) { atomic.AddUint64(&called, 1) _, _ = w.Write([]byte(`[{"data": {"payload": {"CountryIso": "IN"}, "event": {"replayId":1234}}, "channel": "channel_name"}]`)) - } else { - _, _ = w.Write([]byte(`{}`)) + return } + _, _ = w.Write([]byte(`{}`)) return - case `{ - "channel": "/meta/subscribe", - "subscription": "channel_name", - "clientId": "client_id", - "ext": { - "replay": {"channel_name": "-1"} - } - }`: + case "/meta/subscribe": _, _ = w.Write([]byte(`[{"clientId": "client_id", "channel": "/meta/subscribe", "subscription": "channel_name", "successful":true}]`)) return default: @@ -355,3 +389,218 @@ func assertEventMatches(t *testing.T, expected bay.MaybeMsg, got beat.Event) { require.NoError(t, err) require.Equal(t, string(expected.Msg.Data.Payload), message) } + +func TestMultiEventForEOFRetryHandlerInput(t *testing.T) { + var err error + + errorAfterEvent := 2 + expectedHTTPEventCount := 6 + expectedEventCount := 4 + + eventsCh := make(chan beat.Event) + defer close(eventsCh) + signal := make(chan struct{}, 1) + defer close(signal) + + outlet := &mockedOutleter{ + onEventHandler: func(event beat.Event) bool { + eventsCh <- event + return true + }, + } + connector := &mockedConnector{ + outlet: outlet, + } + var inputContext finput.Context + + var expected bay.MaybeMsg + expected.Msg.Data.Event.ReplayID = 1234 + expected.Msg.Data.Payload = []byte(`{"CountryIso": "IN"}`) + expected.Msg.Channel = "channel_name" + + config := map[string]interface{}{ + "channel_name": "channel_name", + "auth.oauth2.client.id": "client.id", + "auth.oauth2.client.secret": "client.secret", + "auth.oauth2.user": "user", + "auth.oauth2.password": "password", + } + + i := 0 + var server *httptest.Server + r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "application/json") + _ = r.ParseForm() + if getTokenHandler(w, r) { + return + } + body, _ := io.ReadAll(r.Body) + data := getBayData(body) + + switch data.Channel { + case "/meta/handshake": + _, _ = w.Write([]byte(`[{"ext":{"replay":true,"payload.format":true},"minimumVersion":"1.0","clientId":"client_id","supportedConnectionTypes":["long-polling"],"channel":"/meta/handshake","version":"1.0","successful":true}]`)) + return + case "/meta/connect": + if i < expectedHTTPEventCount { + if i == errorAfterEvent { + // stop server to produce EOF errors + signal <- struct{}{} + } + i++ + _, _ = w.Write([]byte(`[{"data": {"payload": {"CountryIso": "IN"}, "event": {"replayId":1234}}, "channel": "channel_name"}]`)) + return + } + i++ + _, _ = w.Write([]byte(`{}`)) + return + case "/meta/subscribe": + _, _ = w.Write([]byte(`[{"clientId": "client_id", "channel": "/meta/subscribe", "subscription": "channel_name", "successful":true}]`)) + return + default: + } + }) + + server, err = newTestServer("", r) + assert.NoError(t, err) + serverURL = server.URL + + config["auth.oauth2.token_url"] = server.URL + "/token" + + cfg := conf.MustNewConfigFrom(config) + + input, err := NewInput(cfg, connector, inputContext) + require.NoError(t, err) + require.NotNil(t, input) + + input.Run() + go func() { + j := 0 + for event := range eventsCh { + if j >= expectedEventCount { + signal <- struct{}{} + break + } + assertEventMatches(t, expected, event) + j++ + } + }() + + <-signal + // close previous connection + server.CloseClientConnections() + server.Close() + time.Sleep(100 * time.Millisecond) + + // restart connection for new events + server, err = newTestServer(strings.Split(serverURL, "http://")[1], r) + assert.NoError(t, err) + serverURL = server.URL + defer server.Close() + <-signal + + input.Stop() +} + +func newTestServer(URL string, handler http.Handler) (*httptest.Server, error) { + server := httptest.NewUnstartedServer(handler) + if URL != "" { + l, err := net.Listen("tcp", URL) + if err != nil { + return nil, err + } + server.Listener.Close() + server.Listener = l + } + server.Start() + return server, nil +} + +func TestNegativeCases(t *testing.T) { + expectedHTTPEventCount = 1 + defer atomic.StoreUint64(&called, 0) + eventsCh := make(chan beat.Event) + defer close(eventsCh) + + outlet := &mockedOutleter{ + onEventHandler: func(event beat.Event) bool { + eventsCh <- event + return true + }, + } + connector := &mockedConnector{ + outlet: outlet, + } + var inputContext finput.Context + + var expected bay.MaybeMsg + expected.Msg.Data.Event.ReplayID = 1234 + expected.Msg.Data.Payload = []byte(`{"CountryIso": "IN"}`) + expected.Msg.Channel = "channel_name" + + config := map[string]interface{}{ + "channel_name": "channel_name", + "auth.oauth2.client.id": "client.id", + "auth.oauth2.client.secret": "client.secret", + "auth.oauth2.user": "user", + "auth.oauth2.password": "password", + } + + r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("content-type", "application/json") + _ = r.ParseForm() + if getTokenHandler(w, r) { + return + } + body, _ := io.ReadAll(r.Body) + data := getBayData(body) + + switch data.Channel { + case "/meta/handshake": + _, _ = w.Write([]byte(`{}`)) + return + default: + } + }) + server := httptest.NewServer(r) + defer server.Close() + + serverURL = server.URL + config["auth.oauth2.token_url"] = server.URL + "/token" + + cfg := conf.MustNewConfigFrom(config) + + input, err := NewInput(cfg, connector, inputContext) + require.NoError(t, err) + require.NotNil(t, input) + + input.Run() + go func() { + <-eventsCh + assert.Error(t, fmt.Errorf("there should be no events")) + }() + + // wait for run to return error or event + time.Sleep(100 * time.Millisecond) + + input.Stop() +} + +func getTokenHandler(w http.ResponseWriter, r *http.Request) bool { + if r.URL.Path == "/token" { + response := `{"instance_url": "` + serverURL + `", "expires_in": "60", "access_token": "abcd"}` + _, _ = w.Write([]byte(response)) + return true + } + return false +} + +func getBayData(body []byte) *bay.Subscription { + var data bay.Subscription + err := json.Unmarshal(body, &data) + if err != nil { + return nil + } + + return &data +}