diff --git a/apisix/plugins/http-logger.lua b/apisix/plugins/http-logger.lua index 3d2e7f16a4bf..b52c1adcbae3 100644 --- a/apisix/plugins/http-logger.lua +++ b/apisix/plugins/http-logger.lua @@ -44,6 +44,17 @@ local schema = { inactive_timeout = {type = "integer", minimum = 1, default = 5}, batch_max_size = {type = "integer", minimum = 1, default = 1000}, include_req_body = {type = "boolean", default = false}, + include_resp_body = {type = "boolean", default = false}, + include_resp_body_expr = { + type = "array", + minItems = 1, + items = { + type = "array", + items = { + type = "string" + } + } + }, concat_method = {type = "string", default = "json", enum = {"json", "new_line"}} }, @@ -72,7 +83,12 @@ function _M.check_schema(conf, schema_type) if schema_type == core.schema.TYPE_METADATA then return core.schema.check(metadata_schema, conf) end - return core.schema.check(schema, conf) + + local ok, err = core.schema.check(schema, conf) + if not ok then + return nil, err + end + return log_util.check_log_schema(conf) end @@ -162,6 +178,11 @@ local function remove_stale_objects(premature) end +function _M.body_filter(conf, ctx) + log_util.collect_body(conf, ctx) +end + + function _M.log(conf, ctx) local metadata = plugin.plugin_metadata(plugin_name) core.log.info("metadata: ", core.json.delay_encode(metadata)) diff --git a/apisix/plugins/kafka-logger.lua b/apisix/plugins/kafka-logger.lua index f045c3958e15..def042feb2be 100644 --- a/apisix/plugins/kafka-logger.lua +++ b/apisix/plugins/kafka-logger.lua @@ -19,7 +19,6 @@ local log_util = require("apisix.utils.log-util") local producer = require ("resty.kafka.producer") local batch_processor = require("apisix.utils.batch-processor") local plugin = require("apisix.plugin") -local expr = require("resty.expr.v1") local math = math local pairs = pairs @@ -85,6 +84,17 @@ local schema = { } } }, + include_resp_body = {type = "boolean", default = false}, + include_resp_body_expr = { + type = "array", + minItems = 1, + items = { + type = "array", + items = { + type = "string" + } + } + }, -- in lua-resty-kafka, cluster_name is defined as number -- see https://github.com/doujiang24/lua-resty-kafka#new-1 cluster_name = {type = "integer", minimum = 1, default = 1}, @@ -109,19 +119,15 @@ local _M = { function _M.check_schema(conf, schema_type) - - if conf.include_req_body_expr then - local ok, err = expr.new(conf.include_req_body_expr) - if not ok then - return nil, - {error_msg = "failed to validate the 'include_req_body_expr' expression: " .. err} - end - end - if schema_type == core.schema.TYPE_METADATA then return core.schema.check(metadata_schema, conf) end - return core.schema.check(schema, conf) + + local ok, err = core.schema.check(schema, conf) + if not ok then + return nil, err + end + return log_util.check_log_schema(conf) end @@ -191,6 +197,11 @@ local function send_kafka_data(conf, log_message, prod) end +function _M.body_filter(conf, ctx) + log_util.collect_body(conf, ctx) +end + + function _M.log(conf, ctx) local entry if conf.meta_format == "origin" then diff --git a/apisix/utils/log-util.lua b/apisix/utils/log-util.lua index 3f82b920c26e..3f268aa8c306 100644 --- a/apisix/utils/log-util.lua +++ b/apisix/utils/log-util.lua @@ -123,6 +123,10 @@ local function get_full_log(ngx, conf) latency = (ngx_now() - ngx.req.start_time()) * 1000 } + if ctx.resp_body then + log.response.body = ctx.resp_body + end + if conf.include_req_body then local log_request_body = true @@ -132,7 +136,7 @@ local function get_full_log(ngx, conf) if not conf.request_expr then local request_expr, err = expr.new(conf.include_req_body_expr) if not request_expr then - core.log.error('generate log expr err ' .. err) + core.log.error('generate request expr err ' .. err) return log end conf.request_expr = request_expr @@ -201,6 +205,57 @@ function _M.latency_details_in_ms(ctx) end +function _M.check_log_schema(conf) + if conf.include_req_body_expr then + local ok, err = expr.new(conf.include_req_body_expr) + if not ok then + return nil, "failed to validate the 'include_req_body_expr' expression: " .. err + end + end + if conf.include_resp_body_expr then + local ok, err = expr.new(conf.include_resp_body_expr) + if not ok then + return nil, "failed to validate the 'include_resp_body_expr' expression: " .. err + end + end + return true, nil +end + + +function _M.collect_body(conf, ctx) + if conf.include_resp_body then + local log_response_body = true + + if conf.include_resp_body_expr then + if not conf.response_expr then + local response_expr, err = expr.new(conf.include_resp_body_expr) + if not response_expr then + core.log.error('generate response expr err ' .. err) + return + end + conf.response_expr = response_expr + end + + if ctx.res_expr_eval_result == nil then + ctx.res_expr_eval_result = conf.response_expr:eval(ctx.var) + end + + if not ctx.res_expr_eval_result then + log_response_body = false + end + end + + if log_response_body then + local final_body = core.response.hold_body_chunk(ctx, true) + if not final_body then + return + end + ctx.resp_body = final_body + end + end +end + + function _M.get_rfc3339_zulu_timestamp(timestamp) ngx_update_time() local now = timestamp or ngx_now() diff --git a/docs/en/latest/plugins/http-logger.md b/docs/en/latest/plugins/http-logger.md index acc0bc0ad728..c4f5056aa02e 100644 --- a/docs/en/latest/plugins/http-logger.md +++ b/docs/en/latest/plugins/http-logger.md @@ -49,7 +49,9 @@ This will provide the ability to send Log data requests as JSON objects to Monit | buffer_duration | integer | optional | 60 | [1,...] | Maximum age in seconds of the oldest entry in a batch before the batch must be processed.| | max_retry_count | integer | optional | 0 | [0,...] | Maximum number of retries before removing from the processing pipe line. | | retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. | -| include_req_body | boolean | optional | false | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. | +| include_req_body | boolean | optional | false | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. Note: if the request body is too big to be kept in the memory, it can't be logged due to Nginx's limitation. | +| include_resp_body| boolean | optional | false | [false, true] | Whether to include the response body. The response body is included if and only if it is `true`. | +| include_resp_body_expr | array | optional | | | When `include_resp_body` is true, control the behavior based on the result of the [lua-resty-expr](https://github.com/api7/lua-resty-expr) expression. If present, only log the response body when the result is true. | | concat_method | string | optional | "json" | ["json", "new_line"] | Enum type: `json` and `new_line`. **json**: use `json.encode` for all pending logs. **new_line**: use `json.encode` for each pending log and concat them with "\n" line. | ## How To Enable diff --git a/docs/en/latest/plugins/kafka-logger.md b/docs/en/latest/plugins/kafka-logger.md index 9aa3d92559a7..504aa116cdc6 100644 --- a/docs/en/latest/plugins/kafka-logger.md +++ b/docs/en/latest/plugins/kafka-logger.md @@ -58,6 +58,8 @@ For more info on Batch-Processor in Apache APISIX please refer. | retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. | | include_req_body | boolean | optional | false | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. Note: if the request body is too big to be kept in the memory, it can't be logged due to Nginx's limitation. | | include_req_body_expr | array | optional | | | When `include_req_body` is true, control the behavior based on the result of the [lua-resty-expr](https://github.com/api7/lua-resty-expr) expression. If present, only log the request body when the result is true. | +| include_resp_body| boolean | optional | false | [false, true] | Whether to include the response body. The response body is included if and only if it is `true`. | +| include_resp_body_expr | array | optional | | | When `include_resp_body` is true, control the behavior based on the result of the [lua-resty-expr](https://github.com/api7/lua-resty-expr) expression. If present, only log the response body when the result is true. | | cluster_name | integer | optional | 1 | [0,...] | the name of the cluster. When there are two or more kafka clusters, you can specify different names. And this only works with async producer_type.| ### examples of meta_format diff --git a/docs/zh/latest/plugins/http-logger.md b/docs/zh/latest/plugins/http-logger.md index 7ea4fd79180b..b253355d4fcf 100644 --- a/docs/zh/latest/plugins/http-logger.md +++ b/docs/zh/latest/plugins/http-logger.md @@ -50,6 +50,8 @@ title: http-logger | max_retry_count | integer | 可选 | 0 | [0,...] | 从处理管道中移除之前的最大重试次数。 | | retry_delay | integer | 可选 | 1 | [0,...] | 如果执行失败,则应延迟执行流程的秒数。 | | include_req_body | boolean | 可选 | false | [false, true] | 是否包括请求 body。false: 表示不包含请求的 body ; true: 表示包含请求的 body 。 | +| include_resp_body| boolean | 可选 | false | [false, true] | 是否包括响应体。包含响应体,当为`true`。 | +| include_resp_body_expr | array | 可选 | | | 是否采集响体, 基于[lua-resty-expr](https://github.com/api7/lua-resty-expr)。 该选项需要开启 `include_resp_body`| | concat_method | string | 可选 | "json" | ["json", "new_line"] | 枚举类型: `json`、`new_line`。**json**: 对所有待发日志使用 `json.encode` 编码。**new_line**: 对每一条待发日志单独使用 `json.encode` 编码并使用 "\n" 连接起来。 | ## 如何开启 diff --git a/docs/zh/latest/plugins/kafka-logger.md b/docs/zh/latest/plugins/kafka-logger.md index d51158a7c6bf..85955330c646 100644 --- a/docs/zh/latest/plugins/kafka-logger.md +++ b/docs/zh/latest/plugins/kafka-logger.md @@ -58,6 +58,8 @@ title: kafka-logger | retry_delay | integer | 可选 | 1 | [0,...] | 如果执行失败,则应延迟执行流程的秒数。 | | include_req_body | boolean | 可选 | false | [false, true] | 是否包括请求 body。false: 表示不包含请求的 body ;true: 表示包含请求的 body。注意:如果请求 body 没办法完全放在内存中,由于 Nginx 的限制,我们没有办法把它记录下来。| | include_req_body_expr | array | 可选 | | | 当 `include_req_body` 开启时, 基于 [lua-resty-expr](https://github.com/api7/lua-resty-expr) 表达式的结果进行记录。如果该选项存在,只有在表达式为真的时候才会记录请求 body。 | +| include_resp_body| boolean | 可选 | false | [false, true] | 是否包括响应体。包含响应体,当为`true`。 | +| include_resp_body_expr | array | 可选 | | | 是否采集响体, 基于[lua-resty-expr](https://github.com/api7/lua-resty-expr)。 该选项需要开启 `include_resp_body`| | cluster_name | integer | 可选 | 1 | [0,...] | kafka 集群的名称。当有两个或多个 kafka 集群时,可以指定不同的名称。只适用于 producer_type 是 async 模式。| ### meta_format 参考示例 diff --git a/t/plugin/http-logger-json.t b/t/plugin/http-logger-json.t index ed727c2a7377..9787165532e4 100644 --- a/t/plugin/http-logger-json.t +++ b/t/plugin/http-logger-json.t @@ -42,7 +42,7 @@ run_tests; __DATA__ -=== TEST 1: json body +=== TEST 1: json body with request_body --- apisix_yaml routes: - @@ -62,3 +62,126 @@ POST /hello {"sample_payload":"hello"} --- error_log "body":"{\"sample_payload\":\"hello\"}" + + + +=== TEST 2: json body with response_body +--- apisix_yaml +routes: + - + uri: /hello + upstream: + nodes: + "127.0.0.1:1980": 1 + type: roundrobin + plugins: + http-logger: + batch_max_size: 1 + uri: http://127.0.0.1:1980/log + include_resp_body: true +#END +--- request +POST /hello +{"sample_payload":"hello"} +--- error_log +"response":{"body":"hello world\n" + + + +=== TEST 3: json body with response_body and response_body expression +--- apisix_yaml +routes: + - + uri: /hello + upstream: + nodes: + "127.0.0.1:1980": 1 + type: roundrobin + plugins: + http-logger: + batch_max_size: 1 + uri: http://127.0.0.1:1980/log + include_resp_body: true + include_resp_body_expr: + - - arg_bar + - == + - foo +#END +--- request +POST /hello?bar=foo +{"sample_payload":"hello"} +--- error_log +"response":{"body":"hello world\n" + + + +=== TEST 4: json body with response_body, expr not hit +--- apisix_yaml +routes: + - + uri: /hello + upstream: + nodes: + "127.0.0.1:1980": 1 + type: roundrobin + plugins: + http-logger: + batch_max_size: 1 + uri: http://127.0.0.1:1980/log + include_resp_body: true + include_resp_body_expr: + - - arg_bar + - == + - foo +#END +--- request +POST /hello?bar=bar +{"sample_payload":"hello"} +--- no_error_log +"response":{"body":"hello world\n" + + + +=== TEST 5: json body with request_body and response_body +--- apisix_yaml +routes: + - + uri: /hello + upstream: + nodes: + "127.0.0.1:1980": 1 + type: roundrobin + plugins: + http-logger: + batch_max_size: 1 + uri: http://127.0.0.1:1980/log + include_req_body: true + include_resp_body: true +#END +--- request +POST /hello +{"sample_payload":"hello"} +--- error_log eval +qr/(.*"response":\{.*"body":"hello world\\n".*|.*\{\\\"sample_payload\\\":\\\"hello\\\"\}.*){2}/ + + + +=== TEST 6: json body without request_body or response_body +--- apisix_yaml +routes: + - + uri: /hello + upstream: + nodes: + "127.0.0.1:1980": 1 + type: roundrobin + plugins: + http-logger: + batch_max_size: 1 + uri: http://127.0.0.1:1980/log +#END +--- request +POST /hello +{"sample_payload":"hello"} +--- error_log eval +qr/(.*"response":\{.*"body":"hello world\\n".*|.*\{\\\"sample_payload\\\":\\\"hello\\\"\}.*){0}/ diff --git a/t/plugin/http-logger.t b/t/plugin/http-logger.t index 1dd012217394..9dd85db18b81 100644 --- a/t/plugin/http-logger.t +++ b/t/plugin/http-logger.t @@ -784,3 +784,39 @@ qr/sending a batch logs to http:\/\/127.0.0.1:1982\/hello\d?/ --- grep_error_log_out sending a batch logs to http://127.0.0.1:1982/hello sending a batch logs to http://127.0.0.1:1982/hello1 + + + +=== TEST 18: check log schema(include_resp_body_expr) +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.http-logger") + local ok, err = plugin.check_schema({uri = "http://127.0.0.1", + auth_header = "Basic 123", + timeout = 3, + name = "http-logger", + max_retry_count = 2, + retry_delay = 2, + buffer_duration = 2, + inactive_timeout = 2, + batch_max_size = 500, + include_resp_body = true, + include_resp_body_expr = { + {"bar", "<>", "foo"} + } + }) + if not ok then + ngx.say(err) + end + + ngx.say("done") + } + } +--- request +GET /t +--- response_body +failed to validate the 'include_resp_body_expr' expression: invalid operator '<>' +done +--- no_error_log +[error] diff --git a/t/plugin/kafka-logger.t b/t/plugin/kafka-logger.t index 5094910f37ea..42277c6f1301 100644 --- a/t/plugin/kafka-logger.t +++ b/t/plugin/kafka-logger.t @@ -1193,3 +1193,146 @@ hello world --- no_error_log eval qr/send data to kafka: \{.*"body":"abcdef"/ --- wait: 2 + + + +=== TEST 29: check log schema(include_req_body) +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.kafka-logger") + local ok, err = plugin.check_schema({ + kafka_topic = "test", + key = "key1", + broker_list = { + ["127.0.0.1"] = 3 + }, + include_req_body = true, + include_req_body_expr = { + {"bar", "<>", "foo"} + } + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- request +GET /t +--- response_body +failed to validate the 'include_req_body_expr' expression: invalid operator '<>' +done +--- no_error_log +[error] + + + +=== TEST 30: check log schema(include_resp_body) +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.kafka-logger") + local ok, err = plugin.check_schema({ + kafka_topic = "test", + key = "key1", + broker_list = { + ["127.0.0.1"] = 3 + }, + include_resp_body = true, + include_resp_body_expr = { + {"bar", "", "foo"} + } + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- request +GET /t +--- response_body +failed to validate the 'include_resp_body_expr' expression: invalid operator '' +done +--- no_error_log +[error] + + + +=== TEST 31: set route(id: 1,include_resp_body = true,include_resp_body_expr = array) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [=[{ + "plugins": { + "kafka-logger": { + "broker_list" : + { + "127.0.0.1":9092 + }, + "kafka_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "include_resp_body": true, + "include_resp_body_expr": [ + [ + "arg_name", + "==", + "qwerty" + ] + ], + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]=] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } + +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 32: hit route, expr eval success +--- request +POST /hello?name=qwerty +abcdef +--- response_body +hello world +--- no_error_log +[error] +--- error_log eval +qr/send data to kafka: \{.*"body":"hello world\\n"/ +--- wait: 2 + + + +=== TEST 33: hit route,expr eval fail +--- request +POST /hello?name=zcxv +abcdef +--- response_body +hello world +--- no_error_log eval +qr/send data to kafka: \{.*"body":"hello world\\n"/ +--- wait: 2