Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][httpjson] - Separation of global transform contexts and introduction of parent transform context within chains #33499

Merged
merged 11 commits into from
Nov 8, 2022
Merged
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix handling of invalid UserIP and LocalIP values. {pull}32896[32896]
- Allow http_endpoint instances to share ports. {issue}32578[32578] {pull}33377[33377]
- Improve httpjson documentation for split processor. {pull}33473[33473]

- Added separation of transform context object inside httpjson. Introduced new clause `.parent_last_response.*` {pull}33499[33499]
*Heartbeat*

- Add new states field for internal use by new synthetics app. {pull}30632[30632]
Expand Down
11 changes: 8 additions & 3 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -531,10 +531,13 @@ List of transforms to apply to the request before each execution.

Available transforms for request: [`append`, `delete`, `set`].

Can read state from: [`.first_response.*`,`.last_response.*`, `.last_event.*`, `.cursor.*`, `.header.*`, `.url.*`, `.body.*`].
Can read state from: [`.first_response.*`,`.last_response.*`, `.parent_last_response.*` `.last_event.*`, `.cursor.*`, `.header.*`, `.url.*`, `.body.*`].

Can write state to: [`body.*`, `header.*`, `url.*`].

NOTE: The clause `.parent_last_response.` should only be used from within chain steps and when pagination exists at the root request level. If pagination
does not exist at the root level, please use the clause `.first_response.` to access parent response object from within chains.
ShourieG marked this conversation as resolved.
Show resolved Hide resolved

["source","yaml",subs="attributes"]
----
filebeat.inputs:
Expand Down Expand Up @@ -1141,7 +1144,7 @@ Collect and make events from response in any format supported by httpjson for al

The `replace_with: "pattern,value"` clause is used to replace a fixed pattern string defined in `request.url` with the given value.
The fixed pattern must have a `$.` prefix, for example: `$.xyz`. The `value` may be hard coded or extracted from context variables
like [`.last_response.*`, `.first_response.*`] etc. The `replace_with` clause can be used in combination with the `replace` clause
like [`last_response.*`, `first_response.*`, `parent_last_response.*`] etc. The `replace_with` clause can be used in combination with the `replace` clause
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
thus providing a lot of flexibility in the logic of chain requests.

Example:
Expand Down Expand Up @@ -1218,7 +1221,9 @@ response_json using exportId as '2212':
This behaviour of targeted fixed pattern replacement in the url helps solve various use cases.

NOTE: Fixed patterns must not contain commas in their definition. String replacement patterns are matched by the
`replace_with` processor with exact string matching.
`replace_with` processor with exact string matching. The `value` should not start with a `.` (dot). For example, if we want to
extract the value from the last_response context object, the `replace_with` clause should look like: `replace_with:'$.xyz_pattern,last_response.body.id'`
and not `replace_with:'$.xyz_pattern,.last_response.body.id'`.
ShourieG marked this conversation as resolved.
Show resolved Hide resolved

[float]
==== `chain[].while`
Expand Down
89 changes: 87 additions & 2 deletions x-pack/filebeat/input/httpjson/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package httpjson
import (
"context"
"fmt"
"io/ioutil"
"io"
"math/rand"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -694,6 +694,91 @@ func TestInput(t *testing.T) {
`{"space":{"cake":"pumpkin"}}`,
},
},
{
name: "Test global transform context separation with parent_last_response object",
setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
var serverURL string
registerPaginationTransforms()
registerRequestTransforms()
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
link := serverURL + "/link1"
value := fmt.Sprintf(`{"files":[{"id":"1"},{"id":"2"}],"exportId":"2212", "nextLink":"%s"}`, link)
fmt.Fprintln(w, value)
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
case "/link1":
fmt.Fprintln(w, `{"files":[{"id":"3"},{"id":"4"}], "exportId":"2213"}`)
case "/2212/1":
body, _ := io.ReadAll(r.Body)
r.Body.Close()
if string(body) == `{"exportId":"2212"}` {
fmt.Fprintln(w, `{"hello":{"world":"moon"}}`)
}
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
case "/2212/2":
body, _ := io.ReadAll(r.Body)
r.Body.Close()
if string(body) == `{"exportId":"2212"}` {
fmt.Fprintln(w, `{"space":{"cake":"pumpkin"}}`)
}
case "/2213/3":
body, _ := io.ReadAll(r.Body)
r.Body.Close()
if string(body) == `{"exportId":"2213"}` {
fmt.Fprintln(w, `{"hello":{"cake":"pumpkin"}}`)
}
case "/2213/4":
body, _ := io.ReadAll(r.Body)
r.Body.Close()
if string(body) == `{"exportId":"2213"}` {
fmt.Fprintln(w, `{"space":{"world":"moon"}}`)
}
}
})
server := httptest.NewServer(r)
t.Cleanup(func() { registeredTransforms = newRegistry() })
config["request.url"] = server.URL
serverURL = server.URL
config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id"
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodPost,
"response.request_body_on_pagination": true,
"response.pagination": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "url.value",
"value": "[[.last_response.body.nextLink]]",
"fail_on_template_error": true,
},
},
},
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.method": http.MethodPost,
"replace": "$.files[:].id",
"replace_with": "$.exportId,parent_last_response.body.exportId",
"request.transforms": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "body.exportId",
"value": "[[ .parent_last_response.body.exportId ]]",
},
},
},
},
},
},
},
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
`{"hello":{"cake":"pumpkin"}}`,
`{"space":{"world":"moon"}}`,
},
},
}

for _, testCase := range testCases {
Expand Down Expand Up @@ -837,7 +922,7 @@ func defaultHandler(expectedMethod, expectedBody, msg string) http.HandlerFunc {
w.WriteHeader(http.StatusBadRequest)
msg = fmt.Sprintf(`{"error":"expected method was %q"}`, expectedMethod)
case expectedBody != "":
body, _ := ioutil.ReadAll(r.Body)
body, _ := io.ReadAll(r.Body)
r.Body.Close()
if expectedBody != string(body) {
w.WriteHeader(http.StatusBadRequest)
Expand Down
29 changes: 14 additions & 15 deletions x-pack/filebeat/input/httpjson/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
return fmt.Errorf("failed to execute rf.collectResponse: %w", err)
}
// store first response in transform context
var bodyMap mapstr.M
var bodyMap map[string]interface{}
efd6 marked this conversation as resolved.
Show resolved Hide resolved
body, err := io.ReadAll(httpResp.Body)
if err != nil {
return fmt.Errorf("failed to read http response body: %w", err)
Expand All @@ -319,6 +319,8 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
body: bodyMap,
}
trCtx.updateFirstResponse(firstResponse)
// since, initially the first response and last response are the same
trCtx.updateLastResponse(firstResponse)

if len(r.requestFactories) == 1 {
finalResps = append(finalResps, httpResp)
Expand Down Expand Up @@ -357,7 +359,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
urlCopy = rf.url
urlString = rf.url.String()

// new transform context for every chain step , derived from parent transform context
// new transform context for every chain step, derived from parent transform context
var chainTrCtx *transformContext
if rf.isChain {
chainTrCtx = trCtx.clone()
Expand All @@ -368,7 +370,7 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
var replaceArr []string
if rf.replaceWith != "" {
replaceArr = strings.Split(rf.replaceWith, ",")
val, doReplaceWith, err = fetchValueFromContext(trCtx, strings.TrimSpace(replaceArr[1]))
val, doReplaceWith, err = fetchValueFromContext(chainTrCtx, strings.TrimSpace(replaceArr[1]))
if err != nil {
return err
}
Expand Down Expand Up @@ -418,11 +420,11 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p

var events <-chan maybeMsg
if rf.isChain {
events = rf.chainResponseProcessor.startProcessing(stdCtx, trCtx, resps)
events = rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps)
} else {
events = r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps)
}
n += processAndPublishEvents(trCtx, events, publisher, i < len(r.requestFactories), r.log)
n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log)
}
}

Expand Down Expand Up @@ -517,7 +519,7 @@ func processAndPublishEvents(trCtx *transformContext, events <-chan maybeMsg, pu
return n
}

// processRemainingChainEvents , processes the remaining pagination events for chain blocks
// processRemainingChainEvents, processes the remaining pagination events for chain blocks
func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int {
// we start from 0, and skip the 1st event since we have already processed it
events := r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp)
Expand All @@ -542,7 +544,7 @@ func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *t
}
response.Body = io.NopCloser(body)

// for each pagination response , we repeat all the chain steps / blocks
// for each pagination response, we repeat all the chain steps / blocks
count, err := r.processChainPaginationEvents(stdCtx, trCtx, publisher, &response, chainIndex, r.log)
if err != nil {
r.log.Errorf("error processing chain event: %w", err)
Expand Down Expand Up @@ -592,18 +594,15 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx *
urlCopy = rf.url
urlString = rf.url.String()

// new transform context for every chain step , derived from parent transform context
var chainTrCtx *transformContext
if rf.isChain {
chainTrCtx = trCtx.clone()
}
// new transform context for every chain step, derived from parent transform context
chainTrCtx := trCtx.clone()

var val string
var doReplaceWith bool
var replaceArr []string
if rf.replaceWith != "" {
replaceArr = strings.Split(rf.replaceWith, ",")
val, doReplaceWith, err = fetchValueFromContext(trCtx, strings.TrimSpace(replaceArr[1]))
val, doReplaceWith, err = fetchValueFromContext(chainTrCtx, strings.TrimSpace(replaceArr[1]))
if err != nil {
return n, err
}
Expand Down Expand Up @@ -651,8 +650,8 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx *
}
resps = intermediateResps
}
events := rf.chainResponseProcessor.startProcessing(stdCtx, trCtx, resps)
n += processAndPublishEvents(trCtx, events, publisher, i < len(r.requestFactories), r.log)
events := rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps)
n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log)
}

defer httpResp.Body.Close()
Expand Down
39 changes: 24 additions & 15 deletions x-pack/filebeat/input/httpjson/request_chain_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,14 @@ import (
)

const (
lastResponse = "last_response"
// This is generally updated with chain responses, if present, as they continue to occur
// Otherwise this is always the last response of the root request w.r.t pagination
lastResponse = "last_response"
// This is always the first root response
firstResponse = "first_response"
// This is always the last response of the parent (root) request w.r.t pagination
// This is only set if chaining is used
parentLastResponse = "parent_last_response"
)

func newChainHTTPClient(ctx context.Context, authCfg *authConfig, requestCfg *requestConfig, log *logp.Logger, p ...*Policy) (*httpClient, error) {
Expand Down Expand Up @@ -100,7 +106,16 @@ func fetchValueFromContext(trCtx *transformContext, expression string) (string,

switch keys := strings.Split(expression, "."); keys[0] {
case lastResponse:
respMap, err := responseToMap(trCtx.lastResponse, true)
respMap, err := responseToMap(trCtx.lastResponse)
if err != nil {
return "", false, err
}
val, err = iterateRecursive(respMap, keys[1:], 0)
if err != nil {
return "", false, err
}
case parentLastResponse:
respMap, err := responseToMap(trCtx.parentTrCtx.lastResponse)
if err != nil {
return "", false, err
}
Expand All @@ -110,22 +125,25 @@ func fetchValueFromContext(trCtx *transformContext, expression string) (string,
}
case firstResponse:
// since first response body is already a map, we do not need to transform it
respMap, err := responseToMap(trCtx.firstResponse, false)
respMap, err := responseToMap(trCtx.firstResponse)
if err != nil {
return "", false, err
}
val, err = iterateRecursive(respMap, keys[1:], 0)
if err != nil {
return "", false, err
}
// In this scenario we treat the expression as a hardcoded value, with which we will replace the fixed-pattern
case expression:
return expression, true, nil
ShourieG marked this conversation as resolved.
Show resolved Hide resolved
default:
return "", false, fmt.Errorf("context value not supported: %q in %q", keys[0], expression)
}

return fmt.Sprint(val), true, nil
}

func responseToMap(r *response, mapBody bool) (mapstr.M, error) {
func responseToMap(r *response) (mapstr.M, error) {
if r.body == nil {
return nil, fmt.Errorf("response body is empty for request url: %s", &r.url)
}
Expand All @@ -139,16 +157,7 @@ func responseToMap(r *response, mapBody bool) (mapstr.M, error) {
key: value,
}
}
if mapBody {
var bodyMap mapstr.M
err := json.Unmarshal(r.body.([]byte), &bodyMap)
if err != nil {
return nil, err
}
respMap["body"] = bodyMap
} else {
respMap["body"] = r.body
}
respMap["body"] = r.body

return respMap, nil
}
Expand All @@ -172,7 +181,7 @@ func iterateRecursive(m mapstr.M, keys []string, depth int) (interface{}, error)
case reflect.String:
return v.String(), nil
case reflect.Map:
nextMap, ok := v.Interface().(mapstr.M)
nextMap, ok := v.Interface().(map[string]interface{})
if !ok {
return nil, errors.New("unable to parse the value of the given expression")
}
Expand Down
4 changes: 3 additions & 1 deletion x-pack/filebeat/input/httpjson/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type transforms []transform
type transformContext struct {
lock sync.RWMutex
cursor *cursor
parentTrCtx *transformContext
firstEvent *mapstr.M
lastEvent *mapstr.M
lastResponse *response
Expand Down Expand Up @@ -91,15 +92,16 @@ func (ctx *transformContext) updateCursor() {

func (ctx *transformContext) clone() *transformContext {
ctx.lock.Lock()
defer ctx.lock.Unlock()

newCtx := emptyTransformContext()
newCtx.lastEvent = ctx.lastEvent
newCtx.firstEvent = ctx.firstEvent
newCtx.lastResponse = ctx.lastResponse
newCtx.firstResponse = ctx.firstResponse
newCtx.cursor = ctx.cursor
newCtx.parentTrCtx = ctx

ctx.lock.Unlock()
return newCtx
}

Expand Down
4 changes: 4 additions & 0 deletions x-pack/filebeat/input/httpjson/value_tpl.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,10 @@ func (t *valueTpl) Execute(trCtx *transformContext, tr transformable, defaultVal
if trCtx.firstResponse != nil {
data.Put("first_response", trCtx.firstResponseClone().templateValues())
}
// This is only set when chaining is used
if trCtx.parentTrCtx != nil {
data.Put("parent_last_response", trCtx.parentTrCtx.lastResponseClone().templateValues())
}

if err := t.Template.Execute(buf, data); err != nil {
return fallback(err)
Expand Down