From 771337b3a2638edb0f640f5bc5e3340f07271a6a Mon Sep 17 00:00:00 2001
From: Dan Kortschak <dan.kortschak@elastic.co>
Date: Mon, 4 Sep 2023 09:33:35 +0930
Subject: [PATCH 01/16] x-pack/filebeat/input/httpjson: hide sends behind
 methods

---
 x-pack/filebeat/input/httpjson/request.go  | 20 +++++++++++--------
 x-pack/filebeat/input/httpjson/response.go | 23 ++++++++++++++--------
 2 files changed, 27 insertions(+), 16 deletions(-)

diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go
index f92d2944c70b..edb32ba1ad3d 100644
--- a/x-pack/filebeat/input/httpjson/request.go
+++ b/x-pack/filebeat/input/httpjson/request.go
@@ -82,7 +82,8 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
 
 			if len(r.requestFactories) == 1 {
 				finalResps = append(finalResps, httpResp)
-				events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true)
+				events := make(stream)
+				r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true, events)
 				n = processAndPublishEvents(trCtx, events, publisher, true, r.log)
 				continue
 			}
@@ -118,7 +119,8 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
 				return err
 			}
 			// we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values
-			events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false)
+			events := make(stream)
+			r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false, events)
 			n = processAndPublishEvents(trCtx, events, publisher, false, r.log)
 		} else {
 			if len(ids) == 0 {
@@ -187,11 +189,11 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
 				resps = intermediateResps
 			}
 
-			var events <-chan maybeMsg
+			events := make(stream)
 			if rf.isChain {
-				events = rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true)
+				rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, events)
 			} else {
-				events = r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true)
+				r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true, events)
 			}
 			n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log)
 		}
@@ -541,7 +543,8 @@ func (r *requester) getIdsFromResponses(intermediateResps []*http.Response, repl
 // processRemainingChainEvents, processes the remaining pagination events for chain blocks
 func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int {
 	// we start from 0, and skip the 1st event since we have already processed it
-	events := r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true)
+	events := make(stream)
+	r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true, events)
 
 	var n int
 	var eventCount int
@@ -675,7 +678,8 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx *
 			}
 			resps = intermediateResps
 		}
-		events := rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true)
+		events := make(stream)
+		rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, events)
 		n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log)
 	}
 
@@ -698,7 +702,7 @@ func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) {
 }
 
 // processAndPublishEvents process and publish events based on event type
-func processAndPublishEvents(trCtx *transformContext, events <-chan maybeMsg, publisher inputcursor.Publisher, publish bool, log *logp.Logger) int {
+func processAndPublishEvents(trCtx *transformContext, events stream, publisher inputcursor.Publisher, publish bool, log *logp.Logger) int {
 	var n int
 	for maybeMsg := range events {
 		if maybeMsg.failed() {
diff --git a/x-pack/filebeat/input/httpjson/response.go b/x-pack/filebeat/input/httpjson/response.go
index 7adfd956fa20..37e539195c0e 100644
--- a/x-pack/filebeat/input/httpjson/response.go
+++ b/x-pack/filebeat/input/httpjson/response.go
@@ -180,10 +180,19 @@ func newChainResponseProcessor(config chainConfig, httpClient *httpClient, xmlDe
 	return rp
 }
 
-func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool) <-chan maybeMsg {
+type stream chan maybeMsg
+
+func (s stream) event(e mapstr.M) {
+	s <- maybeMsg{msg: e}
+}
+
+func (s stream) fail(err error) {
+	s <- maybeMsg{err: err}
+}
+
+func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch stream) {
 	trCtx.clearIntervalData()
 
-	ch := make(chan maybeMsg)
 	go func() {
 		defer close(ch)
 		var npages int64
@@ -194,7 +203,7 @@ func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *tran
 				pageStartTime := time.Now()
 				page, hasNext, err := iter.next()
 				if err != nil {
-					ch <- maybeMsg{err: err}
+					ch.fail(err)
 					return
 				}
 
@@ -221,13 +230,13 @@ func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *tran
 					for _, t := range rp.transforms {
 						tr, err = t.run(trCtx, tr)
 						if err != nil {
-							ch <- maybeMsg{err: err}
+							ch.fail(err)
 							return
 						}
 					}
 
 					if rp.split == nil {
-						ch <- maybeMsg{msg: tr.body()}
+						ch.event(tr.body())
 						rp.log.Debug("no split found: continuing")
 						continue
 					}
@@ -242,7 +251,7 @@ func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *tran
 							rp.log.Debug(err)
 						default:
 							rp.log.Debug("split operation failed")
-							ch <- maybeMsg{err: err}
+							ch.fail(err)
 							return
 						}
 					}
@@ -257,6 +266,4 @@ func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *tran
 		}
 		rp.metrics.updatePagesPerInterval(npages)
 	}()
-
-	return ch
 }

From 04c1ff4d036ae136e705db0541e0398fd71e6a7a Mon Sep 17 00:00:00 2001
From: Dan Kortschak <dan.kortschak@elastic.co>
Date: Mon, 4 Sep 2023 10:06:46 +0930
Subject: [PATCH 02/16] x-pack/filebeat/input/httpjson: separate publish
 context, from publish action and control flow

---
 x-pack/filebeat/input/httpjson/request.go | 78 +++++++++++++++--------
 1 file changed, 52 insertions(+), 26 deletions(-)

diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go
index edb32ba1ad3d..70a1e42183e0 100644
--- a/x-pack/filebeat/input/httpjson/request.go
+++ b/x-pack/filebeat/input/httpjson/request.go
@@ -82,9 +82,10 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
 
 			if len(r.requestFactories) == 1 {
 				finalResps = append(finalResps, httpResp)
+				p := newPublisher(trCtx, publisher, true, r.log)
 				events := make(stream)
 				r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true, events)
-				n = processAndPublishEvents(trCtx, events, publisher, true, r.log)
+				n = p.processAndPublishEvents(events)
 				continue
 			}
 
@@ -119,9 +120,10 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
 				return err
 			}
 			// we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values
+			p := newPublisher(trCtx, publisher, false, r.log)
 			events := make(stream)
 			r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false, events)
-			n = processAndPublishEvents(trCtx, events, publisher, false, r.log)
+			n = p.processAndPublishEvents(events)
 		} else {
 			if len(ids) == 0 {
 				n = 0
@@ -189,13 +191,14 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
 				resps = intermediateResps
 			}
 
+			p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
 			events := make(stream)
 			if rf.isChain {
 				rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, events)
 			} else {
 				r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true, events)
 			}
-			n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log)
+			n += p.processAndPublishEvents(events)
 		}
 	}
 
@@ -678,9 +681,10 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx *
 			}
 			resps = intermediateResps
 		}
+		p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
 		events := make(stream)
 		rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, events)
-		n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log)
+		n += p.processAndPublishEvents(events)
 	}
 
 	defer func() {
@@ -701,36 +705,58 @@ func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) {
 	return *newUrl, nil
 }
 
+type publisher struct {
+	ctx *transformContext
+	pub inputcursor.Publisher
+	log *logp.Logger
+}
+
+func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, log *logp.Logger) publisher {
+	if !publish {
+		pub = nil
+	}
+	return publisher{
+		ctx: trCtx,
+		pub: pub,
+		log: log,
+	}
+}
+
 // processAndPublishEvents process and publish events based on event type
-func processAndPublishEvents(trCtx *transformContext, events stream, publisher inputcursor.Publisher, publish bool, log *logp.Logger) int {
+func (p publisher) processAndPublishEvents(events stream) int {
 	var n int
 	for maybeMsg := range events {
-		if maybeMsg.failed() {
-			log.Errorf("error processing response: %v", maybeMsg)
-			continue
-		}
+		n += p.processAndPublishEvent(maybeMsg)
+	}
+	return n
+}
 
-		if publish {
-			event, err := makeEvent(maybeMsg.msg)
-			if err != nil {
-				log.Errorf("error creating event: %v", maybeMsg)
-				continue
-			}
+// processAndPublishEvent processes and publishes one events based on event type
+func (p publisher) processAndPublishEvent(evt maybeMsg) int {
+	if evt.failed() {
+		p.log.Errorf("error processing response: %v", evt)
+		return 0
+	}
 
-			if err := publisher.Publish(event, trCtx.cursorMap()); err != nil {
-				log.Errorf("error publishing event: %v", err)
-				continue
-			}
-		}
-		if len(*trCtx.firstEventClone()) == 0 {
-			trCtx.updateFirstEvent(maybeMsg.msg)
+	if p.pub != nil {
+		event, err := makeEvent(evt.msg)
+		if err != nil {
+			p.log.Errorf("error creating event: %v", evt)
+			return 0
 		}
-		trCtx.updateLastEvent(maybeMsg.msg)
-		trCtx.updateCursor()
 
-		n++
+		if err := p.pub.Publish(event, p.ctx.cursorMap()); err != nil {
+			p.log.Errorf("error publishing event: %v", err)
+			return 0
+		}
 	}
-	return n
+	if len(*p.ctx.firstEventClone()) == 0 {
+		p.ctx.updateFirstEvent(evt.msg)
+	}
+	p.ctx.updateLastEvent(evt.msg)
+	p.ctx.updateCursor()
+
+	return 1
 }
 
 const (

From 8643c178936c4ef9ad4fcefcadfcdde9c1f0574e Mon Sep 17 00:00:00 2001
From: Dan Kortschak <dan.kortschak@elastic.co>
Date: Mon, 4 Sep 2023 10:20:27 +0930
Subject: [PATCH 03/16] x-pack/filebeat/input/httpjson: clean up missed split
 use of channels

---
 x-pack/filebeat/input/httpjson/request.go    | 14 +++---
 x-pack/filebeat/input/httpjson/response.go   | 18 +++++--
 x-pack/filebeat/input/httpjson/split.go      | 50 ++++++++++----------
 x-pack/filebeat/input/httpjson/split_test.go | 10 ++--
 4 files changed, 51 insertions(+), 41 deletions(-)

diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go
index 70a1e42183e0..44b68f9f77e4 100644
--- a/x-pack/filebeat/input/httpjson/request.go
+++ b/x-pack/filebeat/input/httpjson/request.go
@@ -83,7 +83,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
 			if len(r.requestFactories) == 1 {
 				finalResps = append(finalResps, httpResp)
 				p := newPublisher(trCtx, publisher, true, r.log)
-				events := make(stream)
+				events := newStream()
 				r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true, events)
 				n = p.processAndPublishEvents(events)
 				continue
@@ -121,7 +121,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
 			}
 			// we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values
 			p := newPublisher(trCtx, publisher, false, r.log)
-			events := make(stream)
+			events := newStream()
 			r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false, events)
 			n = p.processAndPublishEvents(events)
 		} else {
@@ -192,7 +192,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
 			}
 
 			p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
-			events := make(stream)
+			events := newStream()
 			if rf.isChain {
 				rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, events)
 			} else {
@@ -546,12 +546,12 @@ func (r *requester) getIdsFromResponses(intermediateResps []*http.Response, repl
 // processRemainingChainEvents, processes the remaining pagination events for chain blocks
 func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int {
 	// we start from 0, and skip the 1st event since we have already processed it
-	events := make(stream)
+	events := newStream()
 	r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true, events)
 
 	var n int
 	var eventCount int
-	for maybeMsg := range events {
+	for maybeMsg := range events.ch {
 		if maybeMsg.failed() {
 			r.log.Errorf("error processing response: %v", maybeMsg)
 			continue
@@ -682,7 +682,7 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx *
 			resps = intermediateResps
 		}
 		p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
-		events := make(stream)
+		events := newStream()
 		rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, events)
 		n += p.processAndPublishEvents(events)
 	}
@@ -725,7 +725,7 @@ func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bo
 // processAndPublishEvents process and publish events based on event type
 func (p publisher) processAndPublishEvents(events stream) int {
 	var n int
-	for maybeMsg := range events {
+	for maybeMsg := range events.ch {
 		n += p.processAndPublishEvent(maybeMsg)
 	}
 	return n
diff --git a/x-pack/filebeat/input/httpjson/response.go b/x-pack/filebeat/input/httpjson/response.go
index 37e539195c0e..eb914002314a 100644
--- a/x-pack/filebeat/input/httpjson/response.go
+++ b/x-pack/filebeat/input/httpjson/response.go
@@ -180,21 +180,31 @@ func newChainResponseProcessor(config chainConfig, httpClient *httpClient, xmlDe
 	return rp
 }
 
-type stream chan maybeMsg
+type stream struct {
+	ch chan maybeMsg
+}
+
+func newStream() stream {
+	return stream{make(chan maybeMsg)}
+}
 
 func (s stream) event(e mapstr.M) {
-	s <- maybeMsg{msg: e}
+	s.ch <- maybeMsg{msg: e}
 }
 
 func (s stream) fail(err error) {
-	s <- maybeMsg{err: err}
+	s.ch <- maybeMsg{err: err}
+}
+
+func (s stream) close() {
+	close(s.ch)
 }
 
 func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch stream) {
 	trCtx.clearIntervalData()
 
 	go func() {
-		defer close(ch)
+		defer ch.close()
 		var npages int64
 
 		for i, httpResp := range resps {
diff --git a/x-pack/filebeat/input/httpjson/split.go b/x-pack/filebeat/input/httpjson/split.go
index 8fa892a6ce05..355870111b62 100644
--- a/x-pack/filebeat/input/httpjson/split.go
+++ b/x-pack/filebeat/input/httpjson/split.go
@@ -95,13 +95,13 @@ func newSplit(c *splitConfig, log *logp.Logger) (*split, error) {
 // run runs the split operation on the contents of resp, sending successive
 // split results on ch. ctx is passed to transforms that are called during
 // the split.
-func (s *split) run(ctx *transformContext, resp transformable, ch chan<- maybeMsg) error {
+func (s *split) run(ctx *transformContext, resp transformable, events stream) error {
 	root := resp.body()
-	return s.split(ctx, root, ch)
+	return s.split(ctx, root, events)
 }
 
 // split recursively executes the split processor chain.
-func (s *split) split(ctx *transformContext, root mapstr.M, ch chan<- maybeMsg) error {
+func (s *split) split(ctx *transformContext, root mapstr.M, events stream) error {
 	v, err := root.GetValue(s.targetInfo.Name)
 	if err != nil && err != mapstr.ErrKeyNotFound { //nolint:errorlint // mapstr.ErrKeyNotFound is never wrapped by GetValue.
 		return err
@@ -110,21 +110,21 @@ func (s *split) split(ctx *transformContext, root mapstr.M, ch chan<- maybeMsg)
 	if v == nil {
 		if s.ignoreEmptyValue {
 			if s.child != nil {
-				return s.child.split(ctx, root, ch)
+				return s.child.split(ctx, root, events)
 			}
 			if s.keepParent {
-				ch <- maybeMsg{msg: root}
+				events.event(root)
 			}
 			return nil
 		}
 		if s.isRoot {
 			if s.keepParent {
-				ch <- maybeMsg{msg: root}
+				events.event(root)
 				return errEmptyField
 			}
 			return errEmptyRootField
 		}
-		ch <- maybeMsg{msg: root}
+		events.event(root)
 		return errEmptyField
 	}
 
@@ -138,23 +138,23 @@ func (s *split) split(ctx *transformContext, root mapstr.M, ch chan<- maybeMsg)
 		if len(varr) == 0 {
 			if s.ignoreEmptyValue {
 				if s.child != nil {
-					return s.child.split(ctx, root, ch)
+					return s.child.split(ctx, root, events)
 				}
 				if s.keepParent {
-					ch <- maybeMsg{msg: root}
+					events.event(root)
 				}
 				return nil
 			}
 			if s.isRoot {
-				ch <- maybeMsg{msg: root}
+				events.event(root)
 				return errEmptyRootField
 			}
-			ch <- maybeMsg{msg: root}
+			events.event(root)
 			return errEmptyField
 		}
 
 		for _, e := range varr {
-			err := s.sendMessage(ctx, root, s.targetInfo.Name, e, ch)
+			err := s.sendMessage(ctx, root, s.targetInfo.Name, e, events)
 			if err != nil {
 				s.log.Debug(err)
 			}
@@ -170,22 +170,22 @@ func (s *split) split(ctx *transformContext, root mapstr.M, ch chan<- maybeMsg)
 		if len(vmap) == 0 {
 			if s.ignoreEmptyValue {
 				if s.child != nil {
-					return s.child.split(ctx, root, ch)
+					return s.child.split(ctx, root, events)
 				}
 				if s.keepParent {
-					ch <- maybeMsg{msg: root}
+					events.event(root)
 				}
 				return nil
 			}
 			if s.isRoot {
 				return errEmptyRootField
 			}
-			ch <- maybeMsg{msg: root}
+			events.event(root)
 			return errEmptyField
 		}
 
 		for k, e := range vmap {
-			if err := s.sendMessage(ctx, root, k, e, ch); err != nil {
+			if err := s.sendMessage(ctx, root, k, e, events); err != nil {
 				s.log.Debug(err)
 			}
 		}
@@ -200,18 +200,18 @@ func (s *split) split(ctx *transformContext, root mapstr.M, ch chan<- maybeMsg)
 		if len(vstr) == 0 {
 			if s.ignoreEmptyValue {
 				if s.child != nil {
-					return s.child.split(ctx, root, ch)
+					return s.child.split(ctx, root, events)
 				}
 				return nil
 			}
 			if s.isRoot {
 				return errEmptyRootField
 			}
-			ch <- maybeMsg{msg: root}
+			events.event(root)
 			return errEmptyField
 		}
 		for _, substr := range strings.Split(vstr, s.delimiter) {
-			if err := s.sendMessageSplitString(ctx, root, substr, ch); err != nil {
+			if err := s.sendMessageSplitString(ctx, root, substr, events); err != nil {
 				s.log.Debug(err)
 			}
 		}
@@ -224,7 +224,7 @@ func (s *split) split(ctx *transformContext, root mapstr.M, ch chan<- maybeMsg)
 
 // sendMessage sends an array or map split result value, v, on ch after performing
 // any necessary transformations. If key is "", the value is an element of an array.
-func (s *split) sendMessage(ctx *transformContext, root mapstr.M, key string, v interface{}, ch chan<- maybeMsg) error {
+func (s *split) sendMessage(ctx *transformContext, root mapstr.M, key string, v interface{}, events stream) error {
 	obj, ok := toMapStr(v, s.targetInfo.Name)
 	if !ok {
 		return errExpectedSplitObj
@@ -252,10 +252,10 @@ func (s *split) sendMessage(ctx *transformContext, root mapstr.M, key string, v
 	}
 
 	if s.child != nil {
-		return s.child.split(ctx, clone, ch)
+		return s.child.split(ctx, clone, events)
 	}
 
-	ch <- maybeMsg{msg: clone}
+	events.event(clone)
 
 	return nil
 }
@@ -277,7 +277,7 @@ func toMapStr(v interface{}, key string) (mapstr.M, bool) {
 
 // sendMessage sends a string split result value, v, on ch after performing any
 // necessary transformations. If key is "", the value is an element of an array.
-func (s *split) sendMessageSplitString(ctx *transformContext, root mapstr.M, v string, ch chan<- maybeMsg) error {
+func (s *split) sendMessageSplitString(ctx *transformContext, root mapstr.M, v string, events stream) error {
 	clone := root.Clone()
 	_, _ = clone.Put(s.targetInfo.Name, v)
 
@@ -293,10 +293,10 @@ func (s *split) sendMessageSplitString(ctx *transformContext, root mapstr.M, v s
 	}
 
 	if s.child != nil {
-		return s.child.split(ctx, clone, ch)
+		return s.child.split(ctx, clone, events)
 	}
 
-	ch <- maybeMsg{msg: clone}
+	events.event(clone)
 
 	return nil
 }
diff --git a/x-pack/filebeat/input/httpjson/split_test.go b/x-pack/filebeat/input/httpjson/split_test.go
index 2c4553e9df64..fa390061c9a4 100644
--- a/x-pack/filebeat/input/httpjson/split_test.go
+++ b/x-pack/filebeat/input/httpjson/split_test.go
@@ -704,19 +704,19 @@ func TestSplit(t *testing.T) {
 	for _, tc := range cases {
 		tc := tc
 		t.Run(tc.name, func(t *testing.T) {
-			ch := make(chan maybeMsg, len(tc.expectedMessages))
+			events := stream{make(chan maybeMsg, len(tc.expectedMessages))}
 			split, err := newSplitResponse(tc.config, logp.NewLogger(""))
 			assert.NoError(t, err)
-			err = split.run(tc.ctx, tc.resp, ch)
+			err = split.run(tc.ctx, tc.resp, events)
 			if tc.expectedErr == nil {
 				assert.NoError(t, err)
 			} else {
 				assert.EqualError(t, err, tc.expectedErr.Error())
 			}
-			close(ch)
-			assert.Equal(t, len(tc.expectedMessages), len(ch))
+			events.close()
+			assert.Equal(t, len(tc.expectedMessages), len(events.ch))
 			for _, msg := range tc.expectedMessages {
-				e := <-ch
+				e := <-events.ch
 				assert.NoError(t, e.err)
 				assert.Equal(t, msg.Flatten(), e.msg.Flatten())
 			}

From 0ba484fa67f3974f8d7fd300f5a2a3ba90636822 Mon Sep 17 00:00:00 2001
From: Dan Kortschak <dan.kortschak@elastic.co>
Date: Mon, 4 Sep 2023 10:55:43 +0930
Subject: [PATCH 04/16] x-pack/filebeat/input/httpjson: protect direction

---
 x-pack/filebeat/input/httpjson/response.go | 8 +++++++-
 x-pack/filebeat/input/httpjson/split.go    | 8 ++++----
 2 files changed, 11 insertions(+), 5 deletions(-)

diff --git a/x-pack/filebeat/input/httpjson/response.go b/x-pack/filebeat/input/httpjson/response.go
index eb914002314a..28035cde6109 100644
--- a/x-pack/filebeat/input/httpjson/response.go
+++ b/x-pack/filebeat/input/httpjson/response.go
@@ -180,6 +180,12 @@ func newChainResponseProcessor(config chainConfig, httpClient *httpClient, xmlDe
 	return rp
 }
 
+type sendStream interface {
+	event(mapstr.M)
+	fail(error)
+	close()
+}
+
 type stream struct {
 	ch chan maybeMsg
 }
@@ -200,7 +206,7 @@ func (s stream) close() {
 	close(s.ch)
 }
 
-func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch stream) {
+func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch sendStream) {
 	trCtx.clearIntervalData()
 
 	go func() {
diff --git a/x-pack/filebeat/input/httpjson/split.go b/x-pack/filebeat/input/httpjson/split.go
index 355870111b62..0cfb7d3776f4 100644
--- a/x-pack/filebeat/input/httpjson/split.go
+++ b/x-pack/filebeat/input/httpjson/split.go
@@ -95,13 +95,13 @@ func newSplit(c *splitConfig, log *logp.Logger) (*split, error) {
 // run runs the split operation on the contents of resp, sending successive
 // split results on ch. ctx is passed to transforms that are called during
 // the split.
-func (s *split) run(ctx *transformContext, resp transformable, events stream) error {
+func (s *split) run(ctx *transformContext, resp transformable, events sendStream) error {
 	root := resp.body()
 	return s.split(ctx, root, events)
 }
 
 // split recursively executes the split processor chain.
-func (s *split) split(ctx *transformContext, root mapstr.M, events stream) error {
+func (s *split) split(ctx *transformContext, root mapstr.M, events sendStream) error {
 	v, err := root.GetValue(s.targetInfo.Name)
 	if err != nil && err != mapstr.ErrKeyNotFound { //nolint:errorlint // mapstr.ErrKeyNotFound is never wrapped by GetValue.
 		return err
@@ -224,7 +224,7 @@ func (s *split) split(ctx *transformContext, root mapstr.M, events stream) error
 
 // sendMessage sends an array or map split result value, v, on ch after performing
 // any necessary transformations. If key is "", the value is an element of an array.
-func (s *split) sendMessage(ctx *transformContext, root mapstr.M, key string, v interface{}, events stream) error {
+func (s *split) sendMessage(ctx *transformContext, root mapstr.M, key string, v interface{}, events sendStream) error {
 	obj, ok := toMapStr(v, s.targetInfo.Name)
 	if !ok {
 		return errExpectedSplitObj
@@ -277,7 +277,7 @@ func toMapStr(v interface{}, key string) (mapstr.M, bool) {
 
 // sendMessage sends a string split result value, v, on ch after performing any
 // necessary transformations. If key is "", the value is an element of an array.
-func (s *split) sendMessageSplitString(ctx *transformContext, root mapstr.M, v string, events stream) error {
+func (s *split) sendMessageSplitString(ctx *transformContext, root mapstr.M, v string, events sendStream) error {
 	clone := root.Clone()
 	_, _ = clone.Put(s.targetInfo.Name, v)
 

From 514917e94d4a46aa2d71ddec4f65dbcc9f675d09 Mon Sep 17 00:00:00 2001
From: Dan Kortschak <dan.kortschak@elastic.co>
Date: Mon, 4 Sep 2023 11:44:34 +0930
Subject: [PATCH 05/16] x-pack/filebeat/input/httpjson: split out count
 handling and make publisher satisfy sendStream

---
 x-pack/filebeat/input/httpjson/request.go | 56 +++++++++++++++--------
 1 file changed, 36 insertions(+), 20 deletions(-)

diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go
index 44b68f9f77e4..fa6becb42fae 100644
--- a/x-pack/filebeat/input/httpjson/request.go
+++ b/x-pack/filebeat/input/httpjson/request.go
@@ -85,7 +85,8 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
 				p := newPublisher(trCtx, publisher, true, r.log)
 				events := newStream()
 				r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true, events)
-				n = p.processAndPublishEvents(events)
+				p.processAndPublishEvents(events)
+				n = p.eventCount()
 				continue
 			}
 
@@ -123,7 +124,8 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
 			p := newPublisher(trCtx, publisher, false, r.log)
 			events := newStream()
 			r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false, events)
-			n = p.processAndPublishEvents(events)
+			p.processAndPublishEvents(events)
+			n = p.eventCount()
 		} else {
 			if len(ids) == 0 {
 				n = 0
@@ -198,7 +200,8 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
 			} else {
 				r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true, events)
 			}
-			n += p.processAndPublishEvents(events)
+			p.processAndPublishEvents(events)
+			n += p.eventCount()
 		}
 	}
 
@@ -684,7 +687,8 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx *
 		p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
 		events := newStream()
 		rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, events)
-		n += p.processAndPublishEvents(events)
+		p.processAndPublishEvents(events)
+		n += p.eventCount()
 	}
 
 	defer func() {
@@ -708,14 +712,15 @@ func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) {
 type publisher struct {
 	ctx *transformContext
 	pub inputcursor.Publisher
+	n   int
 	log *logp.Logger
 }
 
-func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, log *logp.Logger) publisher {
+func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, log *logp.Logger) *publisher {
 	if !publish {
 		pub = nil
 	}
-	return publisher{
+	return &publisher{
 		ctx: trCtx,
 		pub: pub,
 		log: log,
@@ -723,42 +728,53 @@ func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bo
 }
 
 // processAndPublishEvents process and publish events based on event type
-func (p publisher) processAndPublishEvents(events stream) int {
-	var n int
+func (p *publisher) processAndPublishEvents(events stream) {
 	for maybeMsg := range events.ch {
-		n += p.processAndPublishEvent(maybeMsg)
+		p.processAndPublishEvent(maybeMsg)
 	}
-	return n
 }
 
 // processAndPublishEvent processes and publishes one events based on event type
-func (p publisher) processAndPublishEvent(evt maybeMsg) int {
+func (p *publisher) processAndPublishEvent(evt maybeMsg) {
 	if evt.failed() {
-		p.log.Errorf("error processing response: %v", evt)
-		return 0
+		p.fail(evt.err)
+		return
 	}
+	p.event(evt.msg)
+}
 
+func (p *publisher) event(msg mapstr.M) {
 	if p.pub != nil {
-		event, err := makeEvent(evt.msg)
+		event, err := makeEvent(msg)
 		if err != nil {
-			p.log.Errorf("error creating event: %v", evt)
-			return 0
+			p.log.Errorf("error creating event: %v: %v", msg, err)
+			return
 		}
 
 		if err := p.pub.Publish(event, p.ctx.cursorMap()); err != nil {
 			p.log.Errorf("error publishing event: %v", err)
-			return 0
+			return
 		}
 	}
 	if len(*p.ctx.firstEventClone()) == 0 {
-		p.ctx.updateFirstEvent(evt.msg)
+		p.ctx.updateFirstEvent(msg)
 	}
-	p.ctx.updateLastEvent(evt.msg)
+	p.ctx.updateLastEvent(msg)
 	p.ctx.updateCursor()
 
-	return 1
+	p.n++
+}
+
+func (p *publisher) fail(err error) {
+	p.log.Errorf("error processing response: %v", err)
 }
 
+func (p *publisher) eventCount() int {
+	return p.n
+}
+
+func (p *publisher) close() {}
+
 const (
 	// This is generally updated with chain responses, if present, as they continue to occur
 	// Otherwise this is always the last response of the root request w.r.t pagination

From 419225a39d9869fa412287f54ae38f72d1bc89b2 Mon Sep 17 00:00:00 2001
From: Dan Kortschak <dan.kortschak@elastic.co>
Date: Mon, 4 Sep 2023 14:31:20 +0930
Subject: [PATCH 06/16] x-pack/filebeat/input/httpjson: replay publisher
 refactors for a chainProcessor type

---
 x-pack/filebeat/input/httpjson/request.go  | 102 ++++++++++++++-------
 x-pack/filebeat/input/httpjson/response.go |   6 +-
 x-pack/filebeat/input/httpjson/split.go    |  25 ++---
 3 files changed, 86 insertions(+), 47 deletions(-)

diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go
index fa6becb42fae..7e398689531b 100644
--- a/x-pack/filebeat/input/httpjson/request.go
+++ b/x-pack/filebeat/input/httpjson/request.go
@@ -549,52 +549,88 @@ func (r *requester) getIdsFromResponses(intermediateResps []*http.Response, repl
 // processRemainingChainEvents, processes the remaining pagination events for chain blocks
 func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int {
 	// we start from 0, and skip the 1st event since we have already processed it
+	p := newChainProcessor(r, trCtx, publisher, chainIndex)
 	events := newStream()
 	r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true, events)
+	p.processRemainingChainEvents(stdCtx, events)
+	return p.eventCount()
+}
+
+type chainProcessor struct {
+	req  *requester
+	ctx  *transformContext
+	pub  inputcursor.Publisher
+	idx  int
+	tail bool
+	n    int
+}
+
+func newChainProcessor(req *requester, trCtx *transformContext, pub inputcursor.Publisher, idx int) *chainProcessor {
+	return &chainProcessor{
+		req: req,
+		ctx: trCtx,
+		pub: pub,
+		idx: idx,
+	}
+}
 
-	var n int
-	var eventCount int
+func (p *chainProcessor) processRemainingChainEvents(ctx context.Context, events stream) {
 	for maybeMsg := range events.ch {
 		if maybeMsg.failed() {
-			r.log.Errorf("error processing response: %v", maybeMsg)
+			p.fail(maybeMsg.err)
 			continue
 		}
 
-		if n >= 1 { // skip 1st event as it has already ben processed before
-			var response http.Response
-			response.StatusCode = 200
-			body := new(bytes.Buffer)
-			// we construct a new response here from each of the pagination events
-			err := json.NewEncoder(body).Encode(maybeMsg.msg)
-			if err != nil {
-				r.log.Errorf("error processing chain event: %w", err)
-				continue
-			}
-			response.Body = io.NopCloser(body)
+		p.event(ctx, maybeMsg.msg)
+	}
+}
 
-			// updates the cursor for pagination last_event & last_response when chaining is present
-			trCtx.updateLastEvent(maybeMsg.msg)
-			trCtx.updateCursor()
+func (p *chainProcessor) event(ctx context.Context, msg mapstr.M) {
+	if !p.tail {
+		// Skip first event as it has already been processed.
+		p.tail = true
+		return
+	}
 
-			// for each pagination response, we repeat all the chain steps / blocks
-			count, err := r.processChainPaginationEvents(stdCtx, trCtx, publisher, &response, chainIndex, r.log)
-			if err != nil {
-				r.log.Errorf("error processing chain event: %w", err)
-				continue
-			}
-			eventCount += count
+	var response http.Response
+	response.StatusCode = 200
+	body := new(bytes.Buffer)
+	// we construct a new response here from each of the pagination events
+	err := json.NewEncoder(body).Encode(msg)
+	if err != nil {
+		p.req.log.Errorf("error processing chain event: %w", err)
+		return
+	}
+	response.Body = io.NopCloser(body)
 
-			err = response.Body.Close()
-			if err != nil {
-				r.log.Errorf("error closing http response body: %w", err)
-			}
-		}
+	// updates the cursor for pagination last_event & last_response when chaining is present
+	p.ctx.updateLastEvent(msg)
+	p.ctx.updateCursor()
+
+	// for each pagination response, we repeat all the chain steps / blocks
+	n, err := p.req.processChainPaginationEvents(ctx, p.ctx, p.pub, &response, p.idx, p.req.log)
+	if err != nil {
+		p.req.log.Errorf("error processing chain event: %w", err)
+		return
+	}
+	p.n += n
 
-		n++
+	err = response.Body.Close()
+	if err != nil {
+		p.req.log.Errorf("error closing http response body: %w", err)
 	}
-	return eventCount
 }
 
+func (p *chainProcessor) fail(err error) {
+	p.req.log.Errorf("error processing response: %v", err)
+}
+
+func (p *chainProcessor) eventCount() int {
+	return p.n
+}
+
+func (*chainProcessor) close() {}
+
 // processChainPaginationEvents takes a pagination response as input and runs all the chain blocks for the input
 //
 //nolint:bodyclose // response body is closed through drainBody method
@@ -740,10 +776,10 @@ func (p *publisher) processAndPublishEvent(evt maybeMsg) {
 		p.fail(evt.err)
 		return
 	}
-	p.event(evt.msg)
+	p.event(nil, evt.msg)
 }
 
-func (p *publisher) event(msg mapstr.M) {
+func (p *publisher) event(_ context.Context, msg mapstr.M) {
 	if p.pub != nil {
 		event, err := makeEvent(msg)
 		if err != nil {
diff --git a/x-pack/filebeat/input/httpjson/response.go b/x-pack/filebeat/input/httpjson/response.go
index 28035cde6109..dc7de5c3b976 100644
--- a/x-pack/filebeat/input/httpjson/response.go
+++ b/x-pack/filebeat/input/httpjson/response.go
@@ -181,7 +181,7 @@ func newChainResponseProcessor(config chainConfig, httpClient *httpClient, xmlDe
 }
 
 type sendStream interface {
-	event(mapstr.M)
+	event(context.Context, mapstr.M)
 	fail(error)
 	close()
 }
@@ -194,7 +194,7 @@ func newStream() stream {
 	return stream{make(chan maybeMsg)}
 }
 
-func (s stream) event(e mapstr.M) {
+func (s stream) event(_ context.Context, e mapstr.M) {
 	s.ch <- maybeMsg{msg: e}
 }
 
@@ -252,7 +252,7 @@ func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *tran
 					}
 
 					if rp.split == nil {
-						ch.event(tr.body())
+						ch.event(stdCtx, tr.body())
 						rp.log.Debug("no split found: continuing")
 						continue
 					}
diff --git a/x-pack/filebeat/input/httpjson/split.go b/x-pack/filebeat/input/httpjson/split.go
index 0cfb7d3776f4..dc77fbb0fde1 100644
--- a/x-pack/filebeat/input/httpjson/split.go
+++ b/x-pack/filebeat/input/httpjson/split.go
@@ -5,6 +5,7 @@
 package httpjson
 
 import (
+	"context"
 	"errors"
 	"fmt"
 	"strings"
@@ -102,6 +103,8 @@ func (s *split) run(ctx *transformContext, resp transformable, events sendStream
 
 // split recursively executes the split processor chain.
 func (s *split) split(ctx *transformContext, root mapstr.M, events sendStream) error {
+	todo := context.TODO()
+
 	v, err := root.GetValue(s.targetInfo.Name)
 	if err != nil && err != mapstr.ErrKeyNotFound { //nolint:errorlint // mapstr.ErrKeyNotFound is never wrapped by GetValue.
 		return err
@@ -113,18 +116,18 @@ func (s *split) split(ctx *transformContext, root mapstr.M, events sendStream) e
 				return s.child.split(ctx, root, events)
 			}
 			if s.keepParent {
-				events.event(root)
+				events.event(todo, root)
 			}
 			return nil
 		}
 		if s.isRoot {
 			if s.keepParent {
-				events.event(root)
+				events.event(todo, root)
 				return errEmptyField
 			}
 			return errEmptyRootField
 		}
-		events.event(root)
+		events.event(todo, root)
 		return errEmptyField
 	}
 
@@ -141,15 +144,15 @@ func (s *split) split(ctx *transformContext, root mapstr.M, events sendStream) e
 					return s.child.split(ctx, root, events)
 				}
 				if s.keepParent {
-					events.event(root)
+					events.event(todo, root)
 				}
 				return nil
 			}
 			if s.isRoot {
-				events.event(root)
+				events.event(todo, root)
 				return errEmptyRootField
 			}
-			events.event(root)
+			events.event(todo, root)
 			return errEmptyField
 		}
 
@@ -173,14 +176,14 @@ func (s *split) split(ctx *transformContext, root mapstr.M, events sendStream) e
 					return s.child.split(ctx, root, events)
 				}
 				if s.keepParent {
-					events.event(root)
+					events.event(todo, root)
 				}
 				return nil
 			}
 			if s.isRoot {
 				return errEmptyRootField
 			}
-			events.event(root)
+			events.event(todo, root)
 			return errEmptyField
 		}
 
@@ -207,7 +210,7 @@ func (s *split) split(ctx *transformContext, root mapstr.M, events sendStream) e
 			if s.isRoot {
 				return errEmptyRootField
 			}
-			events.event(root)
+			events.event(todo, root)
 			return errEmptyField
 		}
 		for _, substr := range strings.Split(vstr, s.delimiter) {
@@ -255,7 +258,7 @@ func (s *split) sendMessage(ctx *transformContext, root mapstr.M, key string, v
 		return s.child.split(ctx, clone, events)
 	}
 
-	events.event(clone)
+	events.event(context.TODO(), clone)
 
 	return nil
 }
@@ -296,7 +299,7 @@ func (s *split) sendMessageSplitString(ctx *transformContext, root mapstr.M, v s
 		return s.child.split(ctx, clone, events)
 	}
 
-	events.event(clone)
+	events.event(context.TODO(), clone)
 
 	return nil
 }

From e6bd82f25d6fff9cd6c441043996883bf6786b2f Mon Sep 17 00:00:00 2001
From: Dan Kortschak <dan.kortschak@elastic.co>
Date: Mon, 4 Sep 2023 14:35:35 +0930
Subject: [PATCH 07/16] x-pack/filebeat/input/httpjson: add sequential version
 of *responseProcessor.startProcessing

---
 x-pack/filebeat/input/httpjson/response.go | 76 ++++++++++++++++++++++
 1 file changed, 76 insertions(+)

diff --git a/x-pack/filebeat/input/httpjson/response.go b/x-pack/filebeat/input/httpjson/response.go
index dc7de5c3b976..e744069d2725 100644
--- a/x-pack/filebeat/input/httpjson/response.go
+++ b/x-pack/filebeat/input/httpjson/response.go
@@ -283,3 +283,79 @@ func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *tran
 		rp.metrics.updatePagesPerInterval(npages)
 	}()
 }
+
+func (rp *responseProcessor) startProcessingSeq(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch sendStream) {
+	trCtx.clearIntervalData()
+
+	defer ch.close()
+	var npages int64
+
+	for i, httpResp := range resps {
+		iter := rp.pagination.newPageIterator(stdCtx, trCtx, httpResp, rp.xmlDetails)
+		for {
+			pageStartTime := time.Now()
+			page, hasNext, err := iter.next()
+			if err != nil {
+				ch.fail(err)
+				return
+			}
+
+			if !hasNext {
+				if i+1 != len(resps) {
+					break
+				}
+				return
+			}
+
+			respTrs := page.asTransformables(rp.log)
+
+			if len(respTrs) == 0 {
+				return
+			}
+
+			// last_response context object is updated here organically
+			trCtx.updateLastResponse(*page)
+			npages = page.page
+
+			rp.log.Debugf("last received page: %#v", trCtx.lastResponse)
+
+			for _, tr := range respTrs {
+				for _, t := range rp.transforms {
+					tr, err = t.run(trCtx, tr)
+					if err != nil {
+						ch.fail(err)
+						return
+					}
+				}
+
+				if rp.split == nil {
+					ch.event(stdCtx, tr.body())
+					rp.log.Debug("no split found: continuing")
+					continue
+				}
+
+				if err := rp.split.run(trCtx, tr, ch); err != nil {
+					switch err { //nolint:errorlint // run never returns a wrapped error.
+					case errEmptyField:
+						// nothing else to send for this page
+						rp.log.Debug("split operation finished")
+					case errEmptyRootField:
+						// root field not found, most likely the response is empty
+						rp.log.Debug(err)
+					default:
+						rp.log.Debug("split operation failed")
+						ch.fail(err)
+						return
+					}
+				}
+			}
+
+			rp.metrics.updatePageExecutionTime(pageStartTime)
+
+			if !paginate {
+				break
+			}
+		}
+	}
+	rp.metrics.updatePagesPerInterval(npages)
+}

From 51c124775cf961a2d88748be7f094fcd9a89724c Mon Sep 17 00:00:00 2001
From: Dan Kortschak <dan.kortschak@elastic.co>
Date: Mon, 4 Sep 2023 14:36:57 +0930
Subject: [PATCH 08/16] x-pack/filebeat/input/httpjson: do chain processing
 sequentially

---
 x-pack/filebeat/input/httpjson/request.go | 15 +--------------
 1 file changed, 1 insertion(+), 14 deletions(-)

diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go
index 7e398689531b..cc5cf74d2c75 100644
--- a/x-pack/filebeat/input/httpjson/request.go
+++ b/x-pack/filebeat/input/httpjson/request.go
@@ -550,9 +550,7 @@ func (r *requester) getIdsFromResponses(intermediateResps []*http.Response, repl
 func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int {
 	// we start from 0, and skip the 1st event since we have already processed it
 	p := newChainProcessor(r, trCtx, publisher, chainIndex)
-	events := newStream()
-	r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true, events)
-	p.processRemainingChainEvents(stdCtx, events)
+	r.responseProcessors[0].startProcessingSeq(stdCtx, trCtx, initialResp, true, p)
 	return p.eventCount()
 }
 
@@ -574,17 +572,6 @@ func newChainProcessor(req *requester, trCtx *transformContext, pub inputcursor.
 	}
 }
 
-func (p *chainProcessor) processRemainingChainEvents(ctx context.Context, events stream) {
-	for maybeMsg := range events.ch {
-		if maybeMsg.failed() {
-			p.fail(maybeMsg.err)
-			continue
-		}
-
-		p.event(ctx, maybeMsg.msg)
-	}
-}
-
 func (p *chainProcessor) event(ctx context.Context, msg mapstr.M) {
 	if !p.tail {
 		// Skip first event as it has already been processed.

From f39c130e5fe0edc118ed15684458d01977e5abb9 Mon Sep 17 00:00:00 2001
From: Dan Kortschak <dan.kortschak@elastic.co>
Date: Mon, 4 Sep 2023 14:38:58 +0930
Subject: [PATCH 09/16] x-pack/filebeat/input/httpjson: do first call
 sequentially

---
 x-pack/filebeat/input/httpjson/request.go | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go
index cc5cf74d2c75..502ba6ac8651 100644
--- a/x-pack/filebeat/input/httpjson/request.go
+++ b/x-pack/filebeat/input/httpjson/request.go
@@ -83,9 +83,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
 			if len(r.requestFactories) == 1 {
 				finalResps = append(finalResps, httpResp)
 				p := newPublisher(trCtx, publisher, true, r.log)
-				events := newStream()
-				r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true, events)
-				p.processAndPublishEvents(events)
+				r.responseProcessors[i].startProcessingSeq(stdCtx, trCtx, finalResps, true, p)
 				n = p.eventCount()
 				continue
 			}

From 7e0ca8b33d3b90abaa55b5dbc84087b8efdf3f6d Mon Sep 17 00:00:00 2001
From: Dan Kortschak <dan.kortschak@elastic.co>
Date: Mon, 4 Sep 2023 14:45:07 +0930
Subject: [PATCH 10/16] x-pack/filebeat/input/httpjson: do second call
 sequentially

---
 x-pack/filebeat/input/httpjson/request.go | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go
index 502ba6ac8651..e83fc23cb605 100644
--- a/x-pack/filebeat/input/httpjson/request.go
+++ b/x-pack/filebeat/input/httpjson/request.go
@@ -120,9 +120,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
 			}
 			// we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values
 			p := newPublisher(trCtx, publisher, false, r.log)
-			events := newStream()
-			r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false, events)
-			p.processAndPublishEvents(events)
+			r.responseProcessors[i].startProcessingSeq(stdCtx, trCtx, finalResps, false, p)
 			n = p.eventCount()
 		} else {
 			if len(ids) == 0 {

From 8d6031186f7a2bfac1392e254505a473fd509300 Mon Sep 17 00:00:00 2001
From: Dan Kortschak <dan.kortschak@elastic.co>
Date: Mon, 4 Sep 2023 14:45:29 +0930
Subject: [PATCH 11/16] x-pack/filebeat/input/httpjson: do third call
 sequentially

---
 x-pack/filebeat/input/httpjson/request.go | 6 ++----
 1 file changed, 2 insertions(+), 4 deletions(-)

diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go
index e83fc23cb605..07d68f05cc94 100644
--- a/x-pack/filebeat/input/httpjson/request.go
+++ b/x-pack/filebeat/input/httpjson/request.go
@@ -190,13 +190,11 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
 			}
 
 			p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
-			events := newStream()
 			if rf.isChain {
-				rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, events)
+				rf.chainResponseProcessor.startProcessingSeq(stdCtx, chainTrCtx, resps, true, p)
 			} else {
-				r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true, events)
+				r.responseProcessors[i].startProcessingSeq(stdCtx, trCtx, resps, true, p)
 			}
-			p.processAndPublishEvents(events)
 			n += p.eventCount()
 		}
 	}

From 2b2f9427e7a9924a4ab87c156be97fccda358d97 Mon Sep 17 00:00:00 2001
From: Dan Kortschak <dan.kortschak@elastic.co>
Date: Mon, 4 Sep 2023 14:45:43 +0930
Subject: [PATCH 12/16] x-pack/filebeat/input/httpjson: do last call
 sequentially

---
 x-pack/filebeat/input/httpjson/request.go | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go
index 07d68f05cc94..1556b6097672 100644
--- a/x-pack/filebeat/input/httpjson/request.go
+++ b/x-pack/filebeat/input/httpjson/request.go
@@ -702,9 +702,7 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx *
 			resps = intermediateResps
 		}
 		p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
-		events := newStream()
-		rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, events)
-		p.processAndPublishEvents(events)
+		rf.chainResponseProcessor.startProcessingSeq(stdCtx, chainTrCtx, resps, true, p)
 		n += p.eventCount()
 	}
 

From 8e4f06149bb0c3bf506ae5f3dee9e18422977487 Mon Sep 17 00:00:00 2001
From: Dan Kortschak <dan.kortschak@elastic.co>
Date: Mon, 4 Sep 2023 14:48:17 +0930
Subject: [PATCH 13/16] x-pack/filebeat/input/httpjson: remove concurrent
 processing

---
 x-pack/filebeat/input/httpjson/request.go  | 12 ++--
 x-pack/filebeat/input/httpjson/response.go | 78 ----------------------
 2 files changed, 6 insertions(+), 84 deletions(-)

diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go
index 1556b6097672..d8952896a99c 100644
--- a/x-pack/filebeat/input/httpjson/request.go
+++ b/x-pack/filebeat/input/httpjson/request.go
@@ -83,7 +83,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
 			if len(r.requestFactories) == 1 {
 				finalResps = append(finalResps, httpResp)
 				p := newPublisher(trCtx, publisher, true, r.log)
-				r.responseProcessors[i].startProcessingSeq(stdCtx, trCtx, finalResps, true, p)
+				r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true, p)
 				n = p.eventCount()
 				continue
 			}
@@ -120,7 +120,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
 			}
 			// we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values
 			p := newPublisher(trCtx, publisher, false, r.log)
-			r.responseProcessors[i].startProcessingSeq(stdCtx, trCtx, finalResps, false, p)
+			r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false, p)
 			n = p.eventCount()
 		} else {
 			if len(ids) == 0 {
@@ -191,9 +191,9 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
 
 			p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
 			if rf.isChain {
-				rf.chainResponseProcessor.startProcessingSeq(stdCtx, chainTrCtx, resps, true, p)
+				rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, p)
 			} else {
-				r.responseProcessors[i].startProcessingSeq(stdCtx, trCtx, resps, true, p)
+				r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true, p)
 			}
 			n += p.eventCount()
 		}
@@ -544,7 +544,7 @@ func (r *requester) getIdsFromResponses(intermediateResps []*http.Response, repl
 func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int {
 	// we start from 0, and skip the 1st event since we have already processed it
 	p := newChainProcessor(r, trCtx, publisher, chainIndex)
-	r.responseProcessors[0].startProcessingSeq(stdCtx, trCtx, initialResp, true, p)
+	r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true, p)
 	return p.eventCount()
 }
 
@@ -702,7 +702,7 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx *
 			resps = intermediateResps
 		}
 		p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
-		rf.chainResponseProcessor.startProcessingSeq(stdCtx, chainTrCtx, resps, true, p)
+		rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, p)
 		n += p.eventCount()
 	}
 
diff --git a/x-pack/filebeat/input/httpjson/response.go b/x-pack/filebeat/input/httpjson/response.go
index e744069d2725..56ea94e4461c 100644
--- a/x-pack/filebeat/input/httpjson/response.go
+++ b/x-pack/filebeat/input/httpjson/response.go
@@ -209,84 +209,6 @@ func (s stream) close() {
 func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch sendStream) {
 	trCtx.clearIntervalData()
 
-	go func() {
-		defer ch.close()
-		var npages int64
-
-		for i, httpResp := range resps {
-			iter := rp.pagination.newPageIterator(stdCtx, trCtx, httpResp, rp.xmlDetails)
-			for {
-				pageStartTime := time.Now()
-				page, hasNext, err := iter.next()
-				if err != nil {
-					ch.fail(err)
-					return
-				}
-
-				if !hasNext {
-					if i+1 != len(resps) {
-						break
-					}
-					return
-				}
-
-				respTrs := page.asTransformables(rp.log)
-
-				if len(respTrs) == 0 {
-					return
-				}
-
-				// last_response context object is updated here organically
-				trCtx.updateLastResponse(*page)
-				npages = page.page
-
-				rp.log.Debugf("last received page: %#v", trCtx.lastResponse)
-
-				for _, tr := range respTrs {
-					for _, t := range rp.transforms {
-						tr, err = t.run(trCtx, tr)
-						if err != nil {
-							ch.fail(err)
-							return
-						}
-					}
-
-					if rp.split == nil {
-						ch.event(stdCtx, tr.body())
-						rp.log.Debug("no split found: continuing")
-						continue
-					}
-
-					if err := rp.split.run(trCtx, tr, ch); err != nil {
-						switch err { //nolint:errorlint // run never returns a wrapped error.
-						case errEmptyField:
-							// nothing else to send for this page
-							rp.log.Debug("split operation finished")
-						case errEmptyRootField:
-							// root field not found, most likely the response is empty
-							rp.log.Debug(err)
-						default:
-							rp.log.Debug("split operation failed")
-							ch.fail(err)
-							return
-						}
-					}
-				}
-
-				rp.metrics.updatePageExecutionTime(pageStartTime)
-
-				if !paginate {
-					break
-				}
-			}
-		}
-		rp.metrics.updatePagesPerInterval(npages)
-	}()
-}
-
-func (rp *responseProcessor) startProcessingSeq(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch sendStream) {
-	trCtx.clearIntervalData()
-
 	defer ch.close()
 	var npages int64
 

From 9bd1ea9e2b72cc0d3e92285b1ff941412da3f0f8 Mon Sep 17 00:00:00 2001
From: Dan Kortschak <dan.kortschak@elastic.co>
Date: Mon, 4 Sep 2023 14:51:06 +0930
Subject: [PATCH 14/16] x-pack/filebeat/input/httpjson: remove now redundant
 locks

---
 x-pack/filebeat/input/httpjson/transform.go | 29 ---------------------
 1 file changed, 29 deletions(-)

diff --git a/x-pack/filebeat/input/httpjson/transform.go b/x-pack/filebeat/input/httpjson/transform.go
index d4055889bf04..be9e938756e5 100644
--- a/x-pack/filebeat/input/httpjson/transform.go
+++ b/x-pack/filebeat/input/httpjson/transform.go
@@ -10,7 +10,6 @@ import (
 	"net/http"
 	"net/url"
 	"strconv"
-	"sync"
 
 	"github.com/elastic/beats/v7/libbeat/common"
 	conf "github.com/elastic/elastic-agent-libs/config"
@@ -25,7 +24,6 @@ type transformsConfig []*conf.C
 type transforms []transform
 
 type transformContext struct {
-	lock          sync.RWMutex
 	cursor        *cursor
 	parentTrCtx   *transformContext
 	firstEvent    *mapstr.M
@@ -45,41 +43,28 @@ func emptyTransformContext() *transformContext {
 }
 
 func (ctx *transformContext) cursorMap() mapstr.M {
-	ctx.lock.RLock()
-	defer ctx.lock.RUnlock()
 	return ctx.cursor.clone()
 }
 
 func (ctx *transformContext) lastEventClone() *mapstr.M {
-	ctx.lock.RLock()
-	defer ctx.lock.RUnlock()
 	clone := ctx.lastEvent.Clone()
 	return &clone
 }
 
 func (ctx *transformContext) firstEventClone() *mapstr.M {
-	ctx.lock.RLock()
-	defer ctx.lock.RUnlock()
 	clone := ctx.firstEvent.Clone()
 	return &clone
 }
 
 func (ctx *transformContext) firstResponseClone() *response {
-	ctx.lock.RLock()
-	defer ctx.lock.RUnlock()
 	return ctx.firstResponse.clone()
 }
 
 func (ctx *transformContext) lastResponseClone() *response {
-	ctx.lock.RLock()
-	defer ctx.lock.RUnlock()
 	return ctx.lastResponse.clone()
 }
 
 func (ctx *transformContext) updateCursor() {
-	ctx.lock.Lock()
-	defer ctx.lock.Unlock()
-
 	// we do not want to pass the cursor data to itself
 	newCtx := emptyTransformContext()
 	newCtx.lastEvent = ctx.lastEvent
@@ -91,8 +76,6 @@ func (ctx *transformContext) updateCursor() {
 }
 
 func (ctx *transformContext) clone() *transformContext {
-	ctx.lock.Lock()
-
 	newCtx := emptyTransformContext()
 	newCtx.lastEvent = ctx.lastEvent
 	newCtx.firstEvent = ctx.firstEvent
@@ -100,41 +83,29 @@ func (ctx *transformContext) clone() *transformContext {
 	newCtx.firstResponse = ctx.firstResponse
 	newCtx.cursor = ctx.cursor
 	newCtx.parentTrCtx = ctx
-
-	ctx.lock.Unlock()
 	return newCtx
 }
 
 func (ctx *transformContext) updateLastEvent(e mapstr.M) {
-	ctx.lock.Lock()
-	defer ctx.lock.Unlock()
 	*ctx.lastEvent = e
 }
 
 func (ctx *transformContext) updateFirstEvent(e mapstr.M) {
-	ctx.lock.Lock()
-	defer ctx.lock.Unlock()
 	*ctx.firstEvent = e
 }
 
 func (ctx *transformContext) updateLastResponse(r response) {
-	ctx.lock.Lock()
-	defer ctx.lock.Unlock()
 	*ctx.lastResponse = r
 }
 
 func (ctx *transformContext) updateFirstResponse(r response) {
-	ctx.lock.Lock()
 	*ctx.firstResponse = r
-	ctx.lock.Unlock()
 }
 
 func (ctx *transformContext) clearIntervalData() {
-	ctx.lock.Lock()
 	ctx.lastEvent = &mapstr.M{}
 	ctx.firstEvent = &mapstr.M{}
 	ctx.lastResponse = &response{}
-	ctx.lock.Unlock()
 }
 
 type transformable mapstr.M

From 334b4ee7e293ac1e9e23d0c7e1c16a6f9e162483 Mon Sep 17 00:00:00 2001
From: Dan Kortschak <dan.kortschak@elastic.co>
Date: Mon, 4 Sep 2023 15:00:19 +0930
Subject: [PATCH 15/16] x-pack/filebeat/input/httpjson: clean up scaffolding

---
 x-pack/filebeat/input/httpjson/request.go    | 20 ---------------
 x-pack/filebeat/input/httpjson/response.go   | 23 -----------------
 x-pack/filebeat/input/httpjson/split_test.go | 26 ++++++++++++++------
 3 files changed, 18 insertions(+), 51 deletions(-)

diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go
index d8952896a99c..6a1d926ab40c 100644
--- a/x-pack/filebeat/input/httpjson/request.go
+++ b/x-pack/filebeat/input/httpjson/request.go
@@ -610,8 +610,6 @@ func (p *chainProcessor) eventCount() int {
 	return p.n
 }
 
-func (*chainProcessor) close() {}
-
 // processChainPaginationEvents takes a pagination response as input and runs all the chain blocks for the input
 //
 //nolint:bodyclose // response body is closed through drainBody method
@@ -742,22 +740,6 @@ func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bo
 	}
 }
 
-// processAndPublishEvents process and publish events based on event type
-func (p *publisher) processAndPublishEvents(events stream) {
-	for maybeMsg := range events.ch {
-		p.processAndPublishEvent(maybeMsg)
-	}
-}
-
-// processAndPublishEvent processes and publishes one events based on event type
-func (p *publisher) processAndPublishEvent(evt maybeMsg) {
-	if evt.failed() {
-		p.fail(evt.err)
-		return
-	}
-	p.event(nil, evt.msg)
-}
-
 func (p *publisher) event(_ context.Context, msg mapstr.M) {
 	if p.pub != nil {
 		event, err := makeEvent(msg)
@@ -788,8 +770,6 @@ func (p *publisher) eventCount() int {
 	return p.n
 }
 
-func (p *publisher) close() {}
-
 const (
 	// This is generally updated with chain responses, if present, as they continue to occur
 	// Otherwise this is always the last response of the root request w.r.t pagination
diff --git a/x-pack/filebeat/input/httpjson/response.go b/x-pack/filebeat/input/httpjson/response.go
index 56ea94e4461c..2b8c5de5f4ee 100644
--- a/x-pack/filebeat/input/httpjson/response.go
+++ b/x-pack/filebeat/input/httpjson/response.go
@@ -183,35 +183,12 @@ func newChainResponseProcessor(config chainConfig, httpClient *httpClient, xmlDe
 type sendStream interface {
 	event(context.Context, mapstr.M)
 	fail(error)
-	close()
-}
-
-type stream struct {
-	ch chan maybeMsg
-}
-
-func newStream() stream {
-	return stream{make(chan maybeMsg)}
-}
-
-func (s stream) event(_ context.Context, e mapstr.M) {
-	s.ch <- maybeMsg{msg: e}
-}
-
-func (s stream) fail(err error) {
-	s.ch <- maybeMsg{err: err}
-}
-
-func (s stream) close() {
-	close(s.ch)
 }
 
 func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch sendStream) {
 	trCtx.clearIntervalData()
 
-	defer ch.close()
 	var npages int64
-
 	for i, httpResp := range resps {
 		iter := rp.pagination.newPageIterator(stdCtx, trCtx, httpResp, rp.xmlDetails)
 		for {
diff --git a/x-pack/filebeat/input/httpjson/split_test.go b/x-pack/filebeat/input/httpjson/split_test.go
index fa390061c9a4..10f7a40567dc 100644
--- a/x-pack/filebeat/input/httpjson/split_test.go
+++ b/x-pack/filebeat/input/httpjson/split_test.go
@@ -5,6 +5,7 @@
 package httpjson
 
 import (
+	"context"
 	"testing"
 
 	"github.com/stretchr/testify/assert"
@@ -702,9 +703,8 @@ func TestSplit(t *testing.T) {
 	}
 
 	for _, tc := range cases {
-		tc := tc
 		t.Run(tc.name, func(t *testing.T) {
-			events := stream{make(chan maybeMsg, len(tc.expectedMessages))}
+			events := &stream{t: t}
 			split, err := newSplitResponse(tc.config, logp.NewLogger(""))
 			assert.NoError(t, err)
 			err = split.run(tc.ctx, tc.resp, events)
@@ -713,13 +713,23 @@ func TestSplit(t *testing.T) {
 			} else {
 				assert.EqualError(t, err, tc.expectedErr.Error())
 			}
-			events.close()
-			assert.Equal(t, len(tc.expectedMessages), len(events.ch))
-			for _, msg := range tc.expectedMessages {
-				e := <-events.ch
-				assert.NoError(t, e.err)
-				assert.Equal(t, msg.Flatten(), e.msg.Flatten())
+			assert.Equal(t, len(tc.expectedMessages), len(events.collected))
+			for i, msg := range tc.expectedMessages {
+				assert.Equal(t, msg.Flatten(), events.collected[i].Flatten())
 			}
 		})
 	}
 }
+
+type stream struct {
+	collected []mapstr.M
+	t         *testing.T
+}
+
+func (s *stream) event(_ context.Context, msg mapstr.M) {
+	s.collected = append(s.collected, msg)
+}
+
+func (s *stream) fail(err error) {
+	s.t.Errorf("fail: %v", err)
+}

From f0bc64b31627934dbabec35f47e461f0bad55c0e Mon Sep 17 00:00:00 2001
From: Dan Kortschak <dan.kortschak@elastic.co>
Date: Mon, 4 Sep 2023 17:34:42 +0930
Subject: [PATCH 16/16] add changelog entry

---
 CHANGELOG-developer.next.asciidoc | 1 +
 1 file changed, 1 insertion(+)

diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc
index 3b087d19a9de..2e918d757dce 100644
--- a/CHANGELOG-developer.next.asciidoc
+++ b/CHANGELOG-developer.next.asciidoc
@@ -168,6 +168,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
 - Pin PyYAML version to 5.3.1 to avoid CI errors temporarily {pull}36091[36091]
 - Skip dependabot updates for github.com/elastic/mito. {pull}36158[36158]
 - Add device handling to Okta API package for entity analytics. {pull}35980[35980]
+- Make Filebeat HTTPJSON input process responses sequentially. {pull}36493[36493]
 
 ==== Deprecated