diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 3b087d19a9d..2e918d757dc 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -168,6 +168,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Pin PyYAML version to 5.3.1 to avoid CI errors temporarily {pull}36091[36091] - Skip dependabot updates for github.com/elastic/mito. {pull}36158[36158] - Add device handling to Okta API package for entity analytics. {pull}35980[35980] +- Make Filebeat HTTPJSON input process responses sequentially. {pull}36493[36493] ==== Deprecated diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index f92d2944c70..6a1d926ab40 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -82,8 +82,9 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p if len(r.requestFactories) == 1 { finalResps = append(finalResps, httpResp) - events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true) - n = processAndPublishEvents(trCtx, events, publisher, true, r.log) + p := newPublisher(trCtx, publisher, true, r.log) + r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true, p) + n = p.eventCount() continue } @@ -118,8 +119,9 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p return err } // we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values - events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false) - n = processAndPublishEvents(trCtx, events, publisher, false, r.log) + p := newPublisher(trCtx, publisher, false, r.log) + r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false, p) + n = p.eventCount() } else { if len(ids) == 0 { n = 0 @@ -187,13 +189,13 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p resps = intermediateResps } - var events <-chan maybeMsg + p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log) if rf.isChain { - events = rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true) + rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, p) } else { - events = r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true) + r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true, p) } - n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log) + n += p.eventCount() } } @@ -541,49 +543,71 @@ func (r *requester) getIdsFromResponses(intermediateResps []*http.Response, repl // processRemainingChainEvents, processes the remaining pagination events for chain blocks func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int { // we start from 0, and skip the 1st event since we have already processed it - events := r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true) + p := newChainProcessor(r, trCtx, publisher, chainIndex) + r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true, p) + return p.eventCount() +} - var n int - var eventCount int - for maybeMsg := range events { - if maybeMsg.failed() { - r.log.Errorf("error processing response: %v", maybeMsg) - continue - } +type chainProcessor struct { + req *requester + ctx *transformContext + pub inputcursor.Publisher + idx int + tail bool + n int +} - if n >= 1 { // skip 1st event as it has already ben processed before - var response http.Response - response.StatusCode = 200 - body := new(bytes.Buffer) - // we construct a new response here from each of the pagination events - err := json.NewEncoder(body).Encode(maybeMsg.msg) - if err != nil { - r.log.Errorf("error processing chain event: %w", err) - continue - } - response.Body = io.NopCloser(body) +func newChainProcessor(req *requester, trCtx *transformContext, pub inputcursor.Publisher, idx int) *chainProcessor { + return &chainProcessor{ + req: req, + ctx: trCtx, + pub: pub, + idx: idx, + } +} - // updates the cursor for pagination last_event & last_response when chaining is present - trCtx.updateLastEvent(maybeMsg.msg) - trCtx.updateCursor() +func (p *chainProcessor) event(ctx context.Context, msg mapstr.M) { + if !p.tail { + // Skip first event as it has already been processed. + p.tail = true + return + } - // for each pagination response, we repeat all the chain steps / blocks - count, err := r.processChainPaginationEvents(stdCtx, trCtx, publisher, &response, chainIndex, r.log) - if err != nil { - r.log.Errorf("error processing chain event: %w", err) - continue - } - eventCount += count + var response http.Response + response.StatusCode = 200 + body := new(bytes.Buffer) + // we construct a new response here from each of the pagination events + err := json.NewEncoder(body).Encode(msg) + if err != nil { + p.req.log.Errorf("error processing chain event: %w", err) + return + } + response.Body = io.NopCloser(body) - err = response.Body.Close() - if err != nil { - r.log.Errorf("error closing http response body: %w", err) - } - } + // updates the cursor for pagination last_event & last_response when chaining is present + p.ctx.updateLastEvent(msg) + p.ctx.updateCursor() - n++ + // for each pagination response, we repeat all the chain steps / blocks + n, err := p.req.processChainPaginationEvents(ctx, p.ctx, p.pub, &response, p.idx, p.req.log) + if err != nil { + p.req.log.Errorf("error processing chain event: %w", err) + return } - return eventCount + p.n += n + + err = response.Body.Close() + if err != nil { + p.req.log.Errorf("error closing http response body: %w", err) + } +} + +func (p *chainProcessor) fail(err error) { + p.req.log.Errorf("error processing response: %v", err) +} + +func (p *chainProcessor) eventCount() int { + return p.n } // processChainPaginationEvents takes a pagination response as input and runs all the chain blocks for the input @@ -675,8 +699,9 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx * } resps = intermediateResps } - events := rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true) - n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log) + p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log) + rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, p) + n += p.eventCount() } defer func() { @@ -697,36 +722,52 @@ func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) { return *newUrl, nil } -// processAndPublishEvents process and publish events based on event type -func processAndPublishEvents(trCtx *transformContext, events <-chan maybeMsg, publisher inputcursor.Publisher, publish bool, log *logp.Logger) int { - var n int - for maybeMsg := range events { - if maybeMsg.failed() { - log.Errorf("error processing response: %v", maybeMsg) - continue - } +type publisher struct { + ctx *transformContext + pub inputcursor.Publisher + n int + log *logp.Logger +} - if publish { - event, err := makeEvent(maybeMsg.msg) - if err != nil { - log.Errorf("error creating event: %v", maybeMsg) - continue - } +func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, log *logp.Logger) *publisher { + if !publish { + pub = nil + } + return &publisher{ + ctx: trCtx, + pub: pub, + log: log, + } +} - if err := publisher.Publish(event, trCtx.cursorMap()); err != nil { - log.Errorf("error publishing event: %v", err) - continue - } - } - if len(*trCtx.firstEventClone()) == 0 { - trCtx.updateFirstEvent(maybeMsg.msg) +func (p *publisher) event(_ context.Context, msg mapstr.M) { + if p.pub != nil { + event, err := makeEvent(msg) + if err != nil { + p.log.Errorf("error creating event: %v: %v", msg, err) + return } - trCtx.updateLastEvent(maybeMsg.msg) - trCtx.updateCursor() - n++ + if err := p.pub.Publish(event, p.ctx.cursorMap()); err != nil { + p.log.Errorf("error publishing event: %v", err) + return + } + } + if len(*p.ctx.firstEventClone()) == 0 { + p.ctx.updateFirstEvent(msg) } - return n + p.ctx.updateLastEvent(msg) + p.ctx.updateCursor() + + p.n++ +} + +func (p *publisher) fail(err error) { + p.log.Errorf("error processing response: %v", err) +} + +func (p *publisher) eventCount() int { + return p.n } const ( diff --git a/x-pack/filebeat/input/httpjson/response.go b/x-pack/filebeat/input/httpjson/response.go index 7adfd956fa2..2b8c5de5f4e 100644 --- a/x-pack/filebeat/input/httpjson/response.go +++ b/x-pack/filebeat/input/httpjson/response.go @@ -180,83 +180,81 @@ func newChainResponseProcessor(config chainConfig, httpClient *httpClient, xmlDe return rp } -func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool) <-chan maybeMsg { +type sendStream interface { + event(context.Context, mapstr.M) + fail(error) +} + +func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch sendStream) { trCtx.clearIntervalData() - ch := make(chan maybeMsg) - go func() { - defer close(ch) - var npages int64 - - for i, httpResp := range resps { - iter := rp.pagination.newPageIterator(stdCtx, trCtx, httpResp, rp.xmlDetails) - for { - pageStartTime := time.Now() - page, hasNext, err := iter.next() - if err != nil { - ch <- maybeMsg{err: err} - return - } + var npages int64 + for i, httpResp := range resps { + iter := rp.pagination.newPageIterator(stdCtx, trCtx, httpResp, rp.xmlDetails) + for { + pageStartTime := time.Now() + page, hasNext, err := iter.next() + if err != nil { + ch.fail(err) + return + } - if !hasNext { - if i+1 != len(resps) { - break - } - return + if !hasNext { + if i+1 != len(resps) { + break } + return + } - respTrs := page.asTransformables(rp.log) + respTrs := page.asTransformables(rp.log) - if len(respTrs) == 0 { - return - } + if len(respTrs) == 0 { + return + } - // last_response context object is updated here organically - trCtx.updateLastResponse(*page) - npages = page.page + // last_response context object is updated here organically + trCtx.updateLastResponse(*page) + npages = page.page - rp.log.Debugf("last received page: %#v", trCtx.lastResponse) + rp.log.Debugf("last received page: %#v", trCtx.lastResponse) - for _, tr := range respTrs { - for _, t := range rp.transforms { - tr, err = t.run(trCtx, tr) - if err != nil { - ch <- maybeMsg{err: err} - return - } + for _, tr := range respTrs { + for _, t := range rp.transforms { + tr, err = t.run(trCtx, tr) + if err != nil { + ch.fail(err) + return } + } - if rp.split == nil { - ch <- maybeMsg{msg: tr.body()} - rp.log.Debug("no split found: continuing") - continue - } + if rp.split == nil { + ch.event(stdCtx, tr.body()) + rp.log.Debug("no split found: continuing") + continue + } - if err := rp.split.run(trCtx, tr, ch); err != nil { - switch err { //nolint:errorlint // run never returns a wrapped error. - case errEmptyField: - // nothing else to send for this page - rp.log.Debug("split operation finished") - case errEmptyRootField: - // root field not found, most likely the response is empty - rp.log.Debug(err) - default: - rp.log.Debug("split operation failed") - ch <- maybeMsg{err: err} - return - } + if err := rp.split.run(trCtx, tr, ch); err != nil { + switch err { //nolint:errorlint // run never returns a wrapped error. + case errEmptyField: + // nothing else to send for this page + rp.log.Debug("split operation finished") + case errEmptyRootField: + // root field not found, most likely the response is empty + rp.log.Debug(err) + default: + rp.log.Debug("split operation failed") + ch.fail(err) + return } } + } - rp.metrics.updatePageExecutionTime(pageStartTime) + rp.metrics.updatePageExecutionTime(pageStartTime) - if !paginate { - break - } + if !paginate { + break } } - rp.metrics.updatePagesPerInterval(npages) - }() - - return ch + } + rp.metrics.updatePagesPerInterval(npages) } diff --git a/x-pack/filebeat/input/httpjson/split.go b/x-pack/filebeat/input/httpjson/split.go index 8fa892a6ce0..dc77fbb0fde 100644 --- a/x-pack/filebeat/input/httpjson/split.go +++ b/x-pack/filebeat/input/httpjson/split.go @@ -5,6 +5,7 @@ package httpjson import ( + "context" "errors" "fmt" "strings" @@ -95,13 +96,15 @@ func newSplit(c *splitConfig, log *logp.Logger) (*split, error) { // run runs the split operation on the contents of resp, sending successive // split results on ch. ctx is passed to transforms that are called during // the split. -func (s *split) run(ctx *transformContext, resp transformable, ch chan<- maybeMsg) error { +func (s *split) run(ctx *transformContext, resp transformable, events sendStream) error { root := resp.body() - return s.split(ctx, root, ch) + return s.split(ctx, root, events) } // split recursively executes the split processor chain. -func (s *split) split(ctx *transformContext, root mapstr.M, ch chan<- maybeMsg) error { +func (s *split) split(ctx *transformContext, root mapstr.M, events sendStream) error { + todo := context.TODO() + v, err := root.GetValue(s.targetInfo.Name) if err != nil && err != mapstr.ErrKeyNotFound { //nolint:errorlint // mapstr.ErrKeyNotFound is never wrapped by GetValue. return err @@ -110,21 +113,21 @@ func (s *split) split(ctx *transformContext, root mapstr.M, ch chan<- maybeMsg) if v == nil { if s.ignoreEmptyValue { if s.child != nil { - return s.child.split(ctx, root, ch) + return s.child.split(ctx, root, events) } if s.keepParent { - ch <- maybeMsg{msg: root} + events.event(todo, root) } return nil } if s.isRoot { if s.keepParent { - ch <- maybeMsg{msg: root} + events.event(todo, root) return errEmptyField } return errEmptyRootField } - ch <- maybeMsg{msg: root} + events.event(todo, root) return errEmptyField } @@ -138,23 +141,23 @@ func (s *split) split(ctx *transformContext, root mapstr.M, ch chan<- maybeMsg) if len(varr) == 0 { if s.ignoreEmptyValue { if s.child != nil { - return s.child.split(ctx, root, ch) + return s.child.split(ctx, root, events) } if s.keepParent { - ch <- maybeMsg{msg: root} + events.event(todo, root) } return nil } if s.isRoot { - ch <- maybeMsg{msg: root} + events.event(todo, root) return errEmptyRootField } - ch <- maybeMsg{msg: root} + events.event(todo, root) return errEmptyField } for _, e := range varr { - err := s.sendMessage(ctx, root, s.targetInfo.Name, e, ch) + err := s.sendMessage(ctx, root, s.targetInfo.Name, e, events) if err != nil { s.log.Debug(err) } @@ -170,22 +173,22 @@ func (s *split) split(ctx *transformContext, root mapstr.M, ch chan<- maybeMsg) if len(vmap) == 0 { if s.ignoreEmptyValue { if s.child != nil { - return s.child.split(ctx, root, ch) + return s.child.split(ctx, root, events) } if s.keepParent { - ch <- maybeMsg{msg: root} + events.event(todo, root) } return nil } if s.isRoot { return errEmptyRootField } - ch <- maybeMsg{msg: root} + events.event(todo, root) return errEmptyField } for k, e := range vmap { - if err := s.sendMessage(ctx, root, k, e, ch); err != nil { + if err := s.sendMessage(ctx, root, k, e, events); err != nil { s.log.Debug(err) } } @@ -200,18 +203,18 @@ func (s *split) split(ctx *transformContext, root mapstr.M, ch chan<- maybeMsg) if len(vstr) == 0 { if s.ignoreEmptyValue { if s.child != nil { - return s.child.split(ctx, root, ch) + return s.child.split(ctx, root, events) } return nil } if s.isRoot { return errEmptyRootField } - ch <- maybeMsg{msg: root} + events.event(todo, root) return errEmptyField } for _, substr := range strings.Split(vstr, s.delimiter) { - if err := s.sendMessageSplitString(ctx, root, substr, ch); err != nil { + if err := s.sendMessageSplitString(ctx, root, substr, events); err != nil { s.log.Debug(err) } } @@ -224,7 +227,7 @@ func (s *split) split(ctx *transformContext, root mapstr.M, ch chan<- maybeMsg) // sendMessage sends an array or map split result value, v, on ch after performing // any necessary transformations. If key is "", the value is an element of an array. -func (s *split) sendMessage(ctx *transformContext, root mapstr.M, key string, v interface{}, ch chan<- maybeMsg) error { +func (s *split) sendMessage(ctx *transformContext, root mapstr.M, key string, v interface{}, events sendStream) error { obj, ok := toMapStr(v, s.targetInfo.Name) if !ok { return errExpectedSplitObj @@ -252,10 +255,10 @@ func (s *split) sendMessage(ctx *transformContext, root mapstr.M, key string, v } if s.child != nil { - return s.child.split(ctx, clone, ch) + return s.child.split(ctx, clone, events) } - ch <- maybeMsg{msg: clone} + events.event(context.TODO(), clone) return nil } @@ -277,7 +280,7 @@ func toMapStr(v interface{}, key string) (mapstr.M, bool) { // sendMessage sends a string split result value, v, on ch after performing any // necessary transformations. If key is "", the value is an element of an array. -func (s *split) sendMessageSplitString(ctx *transformContext, root mapstr.M, v string, ch chan<- maybeMsg) error { +func (s *split) sendMessageSplitString(ctx *transformContext, root mapstr.M, v string, events sendStream) error { clone := root.Clone() _, _ = clone.Put(s.targetInfo.Name, v) @@ -293,10 +296,10 @@ func (s *split) sendMessageSplitString(ctx *transformContext, root mapstr.M, v s } if s.child != nil { - return s.child.split(ctx, clone, ch) + return s.child.split(ctx, clone, events) } - ch <- maybeMsg{msg: clone} + events.event(context.TODO(), clone) return nil } diff --git a/x-pack/filebeat/input/httpjson/split_test.go b/x-pack/filebeat/input/httpjson/split_test.go index 2c4553e9df6..10f7a40567d 100644 --- a/x-pack/filebeat/input/httpjson/split_test.go +++ b/x-pack/filebeat/input/httpjson/split_test.go @@ -5,6 +5,7 @@ package httpjson import ( + "context" "testing" "github.com/stretchr/testify/assert" @@ -702,24 +703,33 @@ func TestSplit(t *testing.T) { } for _, tc := range cases { - tc := tc t.Run(tc.name, func(t *testing.T) { - ch := make(chan maybeMsg, len(tc.expectedMessages)) + events := &stream{t: t} split, err := newSplitResponse(tc.config, logp.NewLogger("")) assert.NoError(t, err) - err = split.run(tc.ctx, tc.resp, ch) + err = split.run(tc.ctx, tc.resp, events) if tc.expectedErr == nil { assert.NoError(t, err) } else { assert.EqualError(t, err, tc.expectedErr.Error()) } - close(ch) - assert.Equal(t, len(tc.expectedMessages), len(ch)) - for _, msg := range tc.expectedMessages { - e := <-ch - assert.NoError(t, e.err) - assert.Equal(t, msg.Flatten(), e.msg.Flatten()) + assert.Equal(t, len(tc.expectedMessages), len(events.collected)) + for i, msg := range tc.expectedMessages { + assert.Equal(t, msg.Flatten(), events.collected[i].Flatten()) } }) } } + +type stream struct { + collected []mapstr.M + t *testing.T +} + +func (s *stream) event(_ context.Context, msg mapstr.M) { + s.collected = append(s.collected, msg) +} + +func (s *stream) fail(err error) { + s.t.Errorf("fail: %v", err) +} diff --git a/x-pack/filebeat/input/httpjson/transform.go b/x-pack/filebeat/input/httpjson/transform.go index d4055889bf0..be9e938756e 100644 --- a/x-pack/filebeat/input/httpjson/transform.go +++ b/x-pack/filebeat/input/httpjson/transform.go @@ -10,7 +10,6 @@ import ( "net/http" "net/url" "strconv" - "sync" "github.com/elastic/beats/v7/libbeat/common" conf "github.com/elastic/elastic-agent-libs/config" @@ -25,7 +24,6 @@ type transformsConfig []*conf.C type transforms []transform type transformContext struct { - lock sync.RWMutex cursor *cursor parentTrCtx *transformContext firstEvent *mapstr.M @@ -45,41 +43,28 @@ func emptyTransformContext() *transformContext { } func (ctx *transformContext) cursorMap() mapstr.M { - ctx.lock.RLock() - defer ctx.lock.RUnlock() return ctx.cursor.clone() } func (ctx *transformContext) lastEventClone() *mapstr.M { - ctx.lock.RLock() - defer ctx.lock.RUnlock() clone := ctx.lastEvent.Clone() return &clone } func (ctx *transformContext) firstEventClone() *mapstr.M { - ctx.lock.RLock() - defer ctx.lock.RUnlock() clone := ctx.firstEvent.Clone() return &clone } func (ctx *transformContext) firstResponseClone() *response { - ctx.lock.RLock() - defer ctx.lock.RUnlock() return ctx.firstResponse.clone() } func (ctx *transformContext) lastResponseClone() *response { - ctx.lock.RLock() - defer ctx.lock.RUnlock() return ctx.lastResponse.clone() } func (ctx *transformContext) updateCursor() { - ctx.lock.Lock() - defer ctx.lock.Unlock() - // we do not want to pass the cursor data to itself newCtx := emptyTransformContext() newCtx.lastEvent = ctx.lastEvent @@ -91,8 +76,6 @@ func (ctx *transformContext) updateCursor() { } func (ctx *transformContext) clone() *transformContext { - ctx.lock.Lock() - newCtx := emptyTransformContext() newCtx.lastEvent = ctx.lastEvent newCtx.firstEvent = ctx.firstEvent @@ -100,41 +83,29 @@ func (ctx *transformContext) clone() *transformContext { newCtx.firstResponse = ctx.firstResponse newCtx.cursor = ctx.cursor newCtx.parentTrCtx = ctx - - ctx.lock.Unlock() return newCtx } func (ctx *transformContext) updateLastEvent(e mapstr.M) { - ctx.lock.Lock() - defer ctx.lock.Unlock() *ctx.lastEvent = e } func (ctx *transformContext) updateFirstEvent(e mapstr.M) { - ctx.lock.Lock() - defer ctx.lock.Unlock() *ctx.firstEvent = e } func (ctx *transformContext) updateLastResponse(r response) { - ctx.lock.Lock() - defer ctx.lock.Unlock() *ctx.lastResponse = r } func (ctx *transformContext) updateFirstResponse(r response) { - ctx.lock.Lock() *ctx.firstResponse = r - ctx.lock.Unlock() } func (ctx *transformContext) clearIntervalData() { - ctx.lock.Lock() ctx.lastEvent = &mapstr.M{} ctx.firstEvent = &mapstr.M{} ctx.lastResponse = &response{} - ctx.lock.Unlock() } type transformable mapstr.M