diff --git a/README.md b/README.md index abce5e2a5733..f686a34171d0 100644 --- a/README.md +++ b/README.md @@ -134,7 +134,7 @@ A/B testing, canary release, blue-green deployment, limit rate, defense against - High performance: The single-core QPS reaches 18k with an average delay of fewer than 0.2 milliseconds. - [Fault Injection](docs/en/latest/plugins/fault-injection.md) - [REST Admin API](docs/en/latest/admin-api.md): Using the REST Admin API to control Apache APISIX, which only allows 127.0.0.1 access by default, you can modify the `allow_admin` field in `conf/config.yaml` to specify a list of IPs that are allowed to call the Admin API. Also, note that the Admin API uses key auth to verify the identity of the caller. **The `admin_key` field in `conf/config.yaml` needs to be modified before deployment to ensure security**. - - External Loggers: Export access logs to external log management tools. ([HTTP Logger](docs/en/latest/plugins/http-logger.md), [TCP Logger](docs/en/latest/plugins/tcp-logger.md), [Kafka Logger](docs/en/latest/plugins/kafka-logger.md), [UDP Logger](docs/en/latest/plugins/udp-logger.md), [Google Cloud Logging](docs/en/latest/plugins/google-cloud-logging.md)) + - External Loggers: Export access logs to external log management tools. ([HTTP Logger](docs/en/latest/plugins/http-logger.md), [TCP Logger](docs/en/latest/plugins/tcp-logger.md), [Kafka Logger](docs/en/latest/plugins/kafka-logger.md), [UDP Logger](docs/en/latest/plugins/udp-logger.md), [Google Cloud Logging](docs/en/latest/plugins/google-cloud-logging.md), [RocketMQ Logger](docs/en/latest/plugins/rocketmq-logger.md)) - [Datadog](docs/en/latest/plugins/datadog.md): push custom metrics to the DogStatsD server, comes bundled with [Datadog agent](https://docs.datadoghq.com/agent/), over the UDP protocol. DogStatsD basically is an implementation of StatsD protocol which collects the custom metrics for Apache APISIX agent, aggregates it into a single data point and sends it to the configured Datadog server. - [Helm charts](https://github.com/apache/apisix-helm-chart) diff --git a/apisix/plugins/rocketmq-logger.lua b/apisix/plugins/rocketmq-logger.lua new file mode 100644 index 000000000000..662caea2a798 --- /dev/null +++ b/apisix/plugins/rocketmq-logger.lua @@ -0,0 +1,271 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local log_util = require("apisix.utils.log-util") +local producer = require ("resty.rocketmq.producer") +local acl_rpchook = require("resty.rocketmq.acl_rpchook") +local batch_processor = require("apisix.utils.batch-processor") +local plugin = require("apisix.plugin") + +local pairs = pairs +local type = type +local ipairs = ipairs +local plugin_name = "rocketmq-logger" +local stale_timer_running = false +local timer_at = ngx.timer.at +local ngx = ngx +local buffers = {} + +local lrucache = core.lrucache.new({ + type = "plugin", +}) + +local schema = { + type = "object", + properties = { + meta_format = { + type = "string", + default = "default", + enum = {"default", "origin"}, + }, + nameserver_list = { + type = "object", + minProperties = 1, + patternProperties = { + [".*"] = { + description = "the port of rocketmq nameserver", + type = "integer", + minimum = 1, + maximum = 65535, + }, + }, + }, + rocketmq_topic = {type = "string"}, + key = {type = "string"}, + tag = {type = "string"}, + timeout = {type = "integer", minimum = 1, default = 3}, + use_tls = {type = "boolean", default = false}, + access_key = {type = "string", default = ""}, + secret_key = {type = "string", default = ""}, + name = {type = "string", default = "rocketmq logger"}, + max_retry_count = {type = "integer", minimum = 0, default = 0}, + retry_delay = {type = "integer", minimum = 0, default = 1}, + buffer_duration = {type = "integer", minimum = 1, default = 60}, + inactive_timeout = {type = "integer", minimum = 1, default = 5}, + include_req_body = {type = "boolean", default = false}, + include_req_body_expr = { + type = "array", + minItems = 1, + items = { + type = "array", + items = { + type = "string" + } + } + }, + include_resp_body = {type = "boolean", default = false}, + include_resp_body_expr = { + type = "array", + minItems = 1, + items = { + type = "array", + items = { + type = "string" + } + } + }, + }, + required = {"nameserver_list", "rocketmq_topic"} +} + +local metadata_schema = { + type = "object", + properties = { + log_format = log_util.metadata_schema_log_format, + }, +} + +local _M = { + version = 0.1, + priority = 402, + name = plugin_name, + schema = schema, + metadata_schema = metadata_schema, +} + + +function _M.check_schema(conf, schema_type) + if schema_type == core.schema.TYPE_METADATA then + return core.schema.check(metadata_schema, conf) + end + + local ok, err = core.schema.check(schema, conf) + if not ok then + return nil, err + end + return log_util.check_log_schema(conf) +end + + +-- remove stale objects from the memory after timer expires +local function remove_stale_objects(premature) + if premature then + return + end + + for key, batch in ipairs(buffers) do + if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then + core.log.warn("removing batch processor stale object, conf: ", + core.json.delay_encode(key)) + buffers[key] = nil + end + end + + stale_timer_running = false +end + + +local function create_producer(nameserver_list, producer_config) + core.log.info("create new rocketmq producer instance") + local prod = producer.new(nameserver_list, "apisixLogProducer") + if producer_config.use_tls then + prod:setUseTLS(true) + end + if producer_config.access_key ~= '' then + local aclHook = acl_rpchook.new(producer_config.access_key, producer_config.secret_key) + prod:addRPCHook(aclHook) + end + prod:setTimeout(producer_config.timeout) + return prod +end + + +local function send_rocketmq_data(conf, log_message, prod) + local result, err = prod:send(conf.rocketmq_topic, log_message, conf.tag, conf.key) + if not result then + return false, "failed to send data to rocketmq topic: " .. err .. + ", nameserver_list: " .. core.json.encode(conf.nameserver_list) + end + + core.log.info("queue: ", result.sendResult.messageQueue.queueId) + + return true +end + + +function _M.body_filter(conf, ctx) + log_util.collect_body(conf, ctx) +end + + +function _M.log(conf, ctx) + local entry + if conf.meta_format == "origin" then + entry = log_util.get_req_original(ctx, conf) + -- core.log.info("origin entry: ", entry) + + else + local metadata = plugin.plugin_metadata(plugin_name) + core.log.info("metadata: ", core.json.delay_encode(metadata)) + if metadata and metadata.value.log_format + and core.table.nkeys(metadata.value.log_format) > 0 + then + entry = log_util.get_custom_format_log(ctx, metadata.value.log_format) + core.log.info("custom log format entry: ", core.json.delay_encode(entry)) + else + entry = log_util.get_full_log(ngx, conf) + core.log.info("full log entry: ", core.json.delay_encode(entry)) + end + end + + if not stale_timer_running then + -- run the timer every 30 mins if any log is present + timer_at(1800, remove_stale_objects) + stale_timer_running = true + end + + local log_buffer = buffers[conf] + if log_buffer then + log_buffer:push(entry) + return + end + + -- reuse producer via lrucache to avoid unbalanced partitions of messages in rocketmq + local nameserver_list = core.table.new(core.table.nkeys(conf.nameserver_list), 0) + + for host, port in pairs(conf.nameserver_list) do + local nameserver = host .. ':' .. port + core.table.insert(nameserver_list, nameserver) + end + + local producer_config = { + timeout = conf.timeout * 1000, + use_tls = conf.use_tls, + access_key = conf.access_key, + secret_key = conf.secret_key, + } + + local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, create_producer, + nameserver_list, producer_config) + if err then + return nil, "failed to create the rocketmq producer: " .. err + end + core.log.info("rocketmq nameserver_list[1] port ", + prod.client.nameservers[1].port) + -- Generate a function to be executed by the batch processor + local func = function(entries, batch_max_size) + local data, err + if batch_max_size == 1 then + data = entries[1] + if type(data) ~= "string" then + data, err = core.json.encode(data) -- encode as single {} + end + else + data, err = core.json.encode(entries) -- encode as array [{}] + end + + if not data then + return false, 'error occurred while encoding the data: ' .. err + end + + core.log.info("send data to rocketmq: ", data) + return send_rocketmq_data(conf, data, prod) + end + + local config = { + name = conf.name, + retry_delay = conf.retry_delay, + batch_max_size = conf.batch_max_size, + max_retry_count = conf.max_retry_count, + buffer_duration = conf.buffer_duration, + inactive_timeout = conf.inactive_timeout, + } + + local err + log_buffer, err = batch_processor:new(func, config) + + if not log_buffer then + core.log.error("error when creating the batch processor: ", err) + return + end + + buffers[conf] = log_buffer + log_buffer:push(entry) +end + + +return _M diff --git a/conf/config-default.yaml b/conf/config-default.yaml index dc42e04bea5c..5c77dbc9f531 100644 --- a/conf/config-default.yaml +++ b/conf/config-default.yaml @@ -349,6 +349,7 @@ plugins: # plugin list (sorted by priority) - sls-logger # priority: 406 - tcp-logger # priority: 405 - kafka-logger # priority: 403 + - rocketmq-logger # priority: 402 - syslog # priority: 401 - udp-logger # priority: 400 #- log-rotate # priority: 100 diff --git a/docs/en/latest/config.json b/docs/en/latest/config.json index d40a0ded540d..04c90171de10 100644 --- a/docs/en/latest/config.json +++ b/docs/en/latest/config.json @@ -116,6 +116,7 @@ "plugins/skywalking-logger", "plugins/tcp-logger", "plugins/kafka-logger", + "plugins/rocketmq-logger", "plugins/udp-logger", "plugins/syslog", "plugins/log-rotate", diff --git a/docs/en/latest/plugins/rocketmq-logger.md b/docs/en/latest/plugins/rocketmq-logger.md new file mode 100644 index 000000000000..3f660b4b1fa7 --- /dev/null +++ b/docs/en/latest/plugins/rocketmq-logger.md @@ -0,0 +1,237 @@ +--- +title: rocketmq-logger +--- + + + +## Summary + +- [**Name**](#name) +- [**Attributes**](#attributes) +- [**Info**](#info) +- [**How To Enable**](#how-to-enable) +- [**Test Plugin**](#test-plugin) +- [**Disable Plugin**](#disable-plugin) + +## Name + +`rocketmq-logger` is a plugin which works as a rocketmq client driver for the ngx_lua nginx module. + +This plugin provides the ability to push requests log data as JSON objects to your external rocketmq clusters. In case if you did not receive the log data don't worry give it some time it will automatically send the logs after the timer function expires in our Batch Processor. + +For more info on Batch-Processor in Apache APISIX please refer. +[Batch-Processor](../batch-processor.md) + +## Attributes + +| Name | Type | Requirement | Default | Valid | Description | +| ---------------- | ------- | ----------- | -------------- | ------- | ---------------------------------------------------------------------------------------- | +| nameserver_list | object | required | | | An array of rocketmq nameservers. | +| rocketmq_topic | string | required | | | Target topic to push data. | +| key | string | optional | | | Keys of messages to send. | +| tag | string | optional | | | Tags of messages to send. | +| timeout | integer | optional | 3 | [1,...] | Timeout for the upstream to send data. | +| use_tls | boolean | optional | false | | Whether to open TLS | +| access_key | string | optional | "" | | access key for ACL, empty string means disable ACL. | +| secret_key | string | optional | "" | | secret key for ACL。 | +| name | string | optional | "rocketmq logger" | | A unique identifier to identity the batch processor. | +| meta_format | enum | optional | "default" | ["default","origin"] | `default`: collect the request information with default JSON way. `origin`: collect the request information with original HTTP request. [example](#examples-of-meta_format)| +| batch_max_size | integer | optional | 1000 | [1,...] | Set the maximum number of logs sent in each batch. When the number of logs reaches the set maximum, all logs will be automatically pushed to the `rocketmq` service. | +| inactive_timeout | integer | optional | 5 | [1,...] | The maximum time to refresh the buffer (in seconds). When the maximum refresh time is reached, all logs will be automatically pushed to the `rocketmq` service regardless of whether the number of logs in the buffer reaches the set maximum number. | +| buffer_duration | integer | optional | 60 | [1,...] | Maximum age in seconds of the oldest entry in a batch before the batch must be processed.| +| max_retry_count | integer | optional | 0 | [0,...] | Maximum number of retries before removing from the processing pipe line. | +| retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. | +| include_req_body | boolean | optional | false | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. Note: if the request body is too big to be kept in the memory, it can't be logged due to Nginx's limitation. | +| include_req_body_expr | array | optional | | | When `include_req_body` is true, control the behavior based on the result of the [lua-resty-expr](https://github.com/api7/lua-resty-expr) expression. If present, only log the request body when the result is true. | +| include_resp_body| boolean | optional | false | [false, true] | Whether to include the response body. The response body is included if and only if it is `true`. | +| include_resp_body_expr | array | optional | | | When `include_resp_body` is true, control the behavior based on the result of the [lua-resty-expr](https://github.com/api7/lua-resty-expr) expression. If present, only log the response body when the result is true. | + +### examples of meta_format + +- **default**: + + ```json + { + "upstream": "127.0.0.1:1980", + "start_time": 1619414294760, + "client_ip": "127.0.0.1", + "service_id": "", + "route_id": "1", + "request": { + "querystring": { + "ab": "cd" + }, + "size": 90, + "uri": "/hello?ab=cd", + "url": "http://localhost:1984/hello?ab=cd", + "headers": { + "host": "localhost", + "content-length": "6", + "connection": "close" + }, + "body": "abcdef", + "method": "GET" + }, + "response": { + "headers": { + "connection": "close", + "content-type": "text/plain; charset=utf-8", + "date": "Mon, 26 Apr 2021 05:18:14 GMT", + "server": "APISIX/2.5", + "transfer-encoding": "chunked" + }, + "size": 190, + "status": 200 + }, + "server": { + "hostname": "localhost", + "version": "2.5" + }, + "latency": 0 + } + ``` + +- **origin**: + + ```http + GET /hello?ab=cd HTTP/1.1 + host: localhost + content-length: 6 + connection: close + + abcdef + ``` + +## Info + +The `message` will write to the buffer first. +It will send to the rocketmq server when the buffer exceed the `batch_max_size`, +or every `buffer_duration` flush the buffer. + +In case of success, returns `true`. +In case of errors, returns `nil` with a string describing the error (`buffer overflow`). + +### Sample Nameserver list + +Specify the nameservers of the external rocketmq servers as below sample. + +```json +{ + "127.0.0.1":9876, + "127.0.0.2":9876 +} +``` + +## How To Enable + +The following is an example on how to enable the rocketmq-logger for a specific route. + +```shell +curl http://127.0.0.1:9080/apisix/admin/routes/5 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "batch_max_size": 1, + "name": "rocketmq logger" + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" +}' +``` + +## Test Plugin + +success: + +```shell +$ curl -i http://127.0.0.1:9080/hello +HTTP/1.1 200 OK +... +hello, world +``` + +## Metadata + +| Name | Type | Requirement | Default | Valid | Description | +| ---------------- | ------- | ----------- | ------------- | ------- | ---------------------------------------------------------------------------------------- | +| log_format | object | optional | {"host": "$host", "@timestamp": "$time_iso8601", "client_ip": "$remote_addr"} | | Log format declared as key value pair in JSON format. Only string is supported in the `value` part. If the value starts with `$`, it means to get `APISIX` variables or [Nginx variable](http://nginx.org/en/docs/varindex.html). | + + Note that **the metadata configuration is applied in global scope**, which means it will take effect on all Route or Service which use rocketmq-logger plugin. + +**APISIX Variables** + +| Variable Name | Description | Usage Example | +|------------------|-------------------------|----------------| +| route_id | id of `route` | $route_id | +| route_name | name of `route` | $route_name | +| service_id | id of `service` | $service_id | +| service_name | name of `service` | $service_name | +| consumer_name | username of `consumer` | $consumer_name | + +### Example + +```shell +curl http://127.0.0.1:9080/apisix/admin/plugin_metadata/rocketmq-logger -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "log_format": { + "host": "$host", + "@timestamp": "$time_iso8601", + "client_ip": "$remote_addr" + } +}' +``` + +It is expected to see some logs like that: + +```shell +{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"} +{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"} +``` + +## Disable Plugin + +Remove the corresponding json configuration in the plugin configuration to disable the `rocketmq-logger`. +APISIX plugins are hot-reloaded, therefore no need to restart APISIX. + +```shell +$ curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "methods": ["GET"], + "uri": "/hello", + "plugins": {}, + "upstream": { + "type": "roundrobin", + "nodes": { + "127.0.0.1:1980": 1 + } + } +}' +``` diff --git a/docs/zh/latest/README.md b/docs/zh/latest/README.md index c8a3a654cafa..d46eb94a851b 100644 --- a/docs/zh/latest/README.md +++ b/docs/zh/latest/README.md @@ -135,7 +135,7 @@ A/B 测试、金丝雀发布(灰度发布)、蓝绿部署、限流限速、抵 - 高性能:在单核上 QPS 可以达到 18k,同时延迟只有 0.2 毫秒。 - [故障注入](plugins/fault-injection.md) - [REST Admin API](admin-api.md): 使用 REST Admin API 来控制 Apache APISIX,默认只允许 127.0.0.1 访问,你可以修改 `conf/config.yaml` 中的 `allow_admin` 字段,指定允许调用 Admin API 的 IP 列表。同时需要注意的是,Admin API 使用 key auth 来校验调用者身份,**在部署前需要修改 `conf/config.yaml` 中的 `admin_key` 字段,来保证安全。** - - 外部日志记录器:将访问日志导出到外部日志管理工具。([HTTP Logger](plugins/http-logger.md), [TCP Logger](plugins/tcp-logger.md), [Kafka Logger](plugins/kafka-logger.md), [UDP Logger](plugins/udp-logger.md), [Google Cloud Logging](plugins/google-cloud-logging.md)) + - 外部日志记录器:将访问日志导出到外部日志管理工具。([HTTP Logger](plugins/http-logger.md), [TCP Logger](plugins/tcp-logger.md), [Kafka Logger](plugins/kafka-logger.md), [UDP Logger](plugins/udp-logger.md), [Google Cloud Logging](plugins/google-cloud-logging.md), [RocketMQ Logger](plugins/rocketmq-logger.md)) - [Helm charts](https://github.com/apache/apisix-helm-chart) - **高度可扩展** diff --git a/docs/zh/latest/config.json b/docs/zh/latest/config.json index 9a2eef66dd11..cd914b7a4700 100644 --- a/docs/zh/latest/config.json +++ b/docs/zh/latest/config.json @@ -114,6 +114,7 @@ "plugins/skywalking-logger", "plugins/tcp-logger", "plugins/kafka-logger", + "plugins/rocketmq-logger", "plugins/udp-logger", "plugins/syslog", "plugins/log-rotate", diff --git a/docs/zh/latest/plugins/rocketmq-logger.md b/docs/zh/latest/plugins/rocketmq-logger.md new file mode 100644 index 000000000000..389813d29c88 --- /dev/null +++ b/docs/zh/latest/plugins/rocketmq-logger.md @@ -0,0 +1,232 @@ +--- +title: rocketmq-logger +--- + + + +## 目录 + +- [**简介**](#简介) +- [**属性**](#属性) +- [**工作原理**](#工作原理) +- [**如何启用**](#如何启用) +- [**测试插件**](#测试插件) +- [**禁用插件**](#禁用插件) + +## 简介 + +`rocketmq-logger` 是一个插件,可用作ngx_lua nginx 模块的 rocketmq 客户端驱动程序。 + +它可以将接口请求日志以 JSON 的形式推送给外部 rocketmq 集群。如果在短时间内没有收到日志数据,请放心,它会在我们的批处理处理器中的计时器功能到期后自动发送日志。 + +有关 Apache APISIX 中 Batch-Processor 的更多信息,请参考。 +[Batch-Processor](../batch-processor.md) + +## 属性 + +| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述 | +| ---------------- | ------- | ------ | -------------- | ------- | ------------------------------------------------ | +| nameserver_list | object | 必须 | | | 要推送的 rocketmq 的 nameserver 列表。 | +| rocketmq_topic | string | 必须 | | | 要推送的 topic。 | +| key | string | 可选 | | | 发送消息的keys。 | +| tag | string | 可选 | | | 发送消息的tags。 | +| timeout | integer | 可选 | 3 | [1,...] | 发送数据的超时时间。 | +| use_tls | boolean | 可选 | false | | 是否开启TLS加密。 | +| access_key | string | 可选 | "" | | ACL认证的access key,空字符串表示不开启ACL。 | +| secret_key | string | 可选 | "" | | ACL认证的secret key。 | +| name | string | 可选 | "rocketmq logger" | | batch processor 的唯一标识。 | +| meta_format | enum | 可选 | "default" | ["default","origin"] | `default`:获取请求信息以默认的 JSON 编码方式。`origin`:获取请求信息以 HTTP 原始请求方式。[具体示例](#meta_format-参考示例)| +| batch_max_size | integer | 可选 | 1000 | [1,...] | 设置每批发送日志的最大条数,当日志条数达到设置的最大值时,会自动推送全部日志到 `rocketmq` 服务。| +| inactive_timeout | integer | 可选 | 5 | [1,...] | 刷新缓冲区的最大时间(以秒为单位),当达到最大的刷新时间时,无论缓冲区中的日志数量是否达到设置的最大条数,也会自动将全部日志推送到 `rocketmq` 服务。 | +| buffer_duration | integer | 可选 | 60 | [1,...] | 必须先处理批次中最旧条目的最长期限(以秒为单位)。 | +| max_retry_count | integer | 可选 | 0 | [0,...] | 从处理管道中移除之前的最大重试次数。 | +| retry_delay | integer | 可选 | 1 | [0,...] | 如果执行失败,则应延迟执行流程的秒数。 | +| include_req_body | boolean | 可选 | false | [false, true] | 是否包括请求 body。false: 表示不包含请求的 body ;true: 表示包含请求的 body。注意:如果请求 body 没办法完全放在内存中,由于 Nginx 的限制,我们没有办法把它记录下来。| +| include_req_body_expr | array | 可选 | | | 当 `include_req_body` 开启时, 基于 [lua-resty-expr](https://github.com/api7/lua-resty-expr) 表达式的结果进行记录。如果该选项存在,只有在表达式为真的时候才会记录请求 body。 | +| include_resp_body| boolean | 可选 | false | [false, true] | 是否包括响应体。包含响应体,当为`true`。 | +| include_resp_body_expr | array | 可选 | | | 是否采集响体, 基于[lua-resty-expr](https://github.com/api7/lua-resty-expr)。 该选项需要开启 `include_resp_body`| + +### meta_format 参考示例 + +- **default**: + + ```json + { + "upstream": "127.0.0.1:1980", + "start_time": 1619414294760, + "client_ip": "127.0.0.1", + "service_id": "", + "route_id": "1", + "request": { + "querystring": { + "ab": "cd" + }, + "size": 90, + "uri": "/hello?ab=cd", + "url": "http://localhost:1984/hello?ab=cd", + "headers": { + "host": "localhost", + "content-length": "6", + "connection": "close" + }, + "body": "abcdef", + "method": "GET" + }, + "response": { + "headers": { + "connection": "close", + "content-type": "text/plain; charset=utf-8", + "date": "Mon, 26 Apr 2021 05:18:14 GMT", + "server": "APISIX/2.5", + "transfer-encoding": "chunked" + }, + "size": 190, + "status": 200 + }, + "server": { + "hostname": "localhost", + "version": "2.5" + }, + "latency": 0 + } + ``` + +- **origin**: + + ```http + GET /hello?ab=cd HTTP/1.1 + host: localhost + content-length: 6 + connection: close + + abcdef + ``` + +## 工作原理 + +消息将首先写入缓冲区。 +当缓冲区超过 `batch_max_size` 时,它将发送到 rocketmq 服务器, +或每个 `buffer_duration` 刷新缓冲区。 + +如果成功,则返回 `true`。 +如果出现错误,则返回 `nil`,并带有描述错误的字符串(`buffer overflow`)。 + +### Nameserver 列表 + +配置多个nameserver地址如下: + +```json +{ + "127.0.0.1":9876, + "127.0.0.2":9876 +} +``` + +## 如何启用 + +1. 为特定路由启用 rocketmq-logger 插件。 + +```shell +curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" +}' +``` + +## 测试插件 + + 成功 + +```shell +$ curl -i http://127.0.0.1:9080/hello +HTTP/1.1 200 OK +... +hello, world +``` + +## 插件元数据设置 + +| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述 | +| ---------------- | ------- | ------ | ------------- | ------- | ------------------------------------------------ | +| log_format | object | 可选 | {"host": "$host", "@timestamp": "$time_iso8601", "client_ip": "$remote_addr"} | | 以 JSON 格式的键值对来声明日志格式。对于值部分,仅支持字符串。如果是以 `$` 开头,则表明是要获取 __APISIX__ 变量或 [Nginx 内置变量](http://nginx.org/en/docs/varindex.html)。特别的,**该设置是全局生效的**,意味着指定 log_format 后,将对所有绑定 http-logger 的 Route 或 Service 生效。 | + +**APISIX 变量** + +| 变量名 | 描述 | 使用示例 | +|------------------|-------------------------|----------------| +| route_id | `route` 的 id | $route_id | +| route_name | `route` 的 name | $route_name | +| service_id | `service` 的 id | $service_id | +| service_name | `service` 的 name | $service_name | +| consumer_name | `consumer` 的 username | $consumer_name | + +### 设置日志格式示例 + +```shell +curl http://127.0.0.1:9080/apisix/admin/plugin_metadata/rocketmq-logger -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "log_format": { + "host": "$host", + "@timestamp": "$time_iso8601", + "client_ip": "$remote_addr" + } +}' +``` + +在日志收集处,将得到类似下面的日志: + +```shell +{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"} +{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"} +``` + +## 禁用插件 + +当您要禁用 `rocketmq-logger` 插件时,这很简单,您可以在插件配置中删除相应的 json 配置,无需重新启动服务,它将立即生效: + +```shell +$ curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "methods": ["GET"], + "uri": "/hello", + "plugins": {}, + "upstream": { + "type": "roundrobin", + "nodes": { + "127.0.0.1:1980": 1 + } + } +}' +``` diff --git a/rockspec/apisix-master-0.rockspec b/rockspec/apisix-master-0.rockspec index f3e37f08c0bc..4b62943b371e 100644 --- a/rockspec/apisix-master-0.rockspec +++ b/rockspec/apisix-master-0.rockspec @@ -72,6 +72,7 @@ dependencies = { "api7-snowflake = 2.0-1", "inspect == 3.1.1", "lualdap = 1.2.6-1", + "lua-resty-rocketmq = 0.2.1-0", } build = { diff --git a/t/admin/plugins.t b/t/admin/plugins.t index 4305b66599bf..fe7489f7a805 100644 --- a/t/admin/plugins.t +++ b/t/admin/plugins.t @@ -106,6 +106,7 @@ google-cloud-logging sls-logger tcp-logger kafka-logger +rocketmq-logger syslog udp-logger example-plugin diff --git a/t/debug/debug-mode.t b/t/debug/debug-mode.t index 2f38dcbf5151..026569558546 100644 --- a/t/debug/debug-mode.t +++ b/t/debug/debug-mode.t @@ -81,6 +81,7 @@ loaded plugin and sort by priority: 410 name: http-logger loaded plugin and sort by priority: 406 name: sls-logger loaded plugin and sort by priority: 405 name: tcp-logger loaded plugin and sort by priority: 403 name: kafka-logger +loaded plugin and sort by priority: 402 name: rocketmq-logger loaded plugin and sort by priority: 401 name: syslog loaded plugin and sort by priority: 400 name: udp-logger loaded plugin and sort by priority: 0 name: example-plugin diff --git a/t/plugin/rocketmq-logger-log-format.t b/t/plugin/rocketmq-logger-log-format.t new file mode 100644 index 000000000000..1c4de982b21b --- /dev/null +++ b/t/plugin/rocketmq-logger-log-format.t @@ -0,0 +1,124 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +log_level('info'); +repeat_each(1); +no_long_string(); +no_root_location(); + +run_tests; + +__DATA__ + +=== TEST 1: add plugin metadata +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/plugin_metadata/rocketmq-logger', + ngx.HTTP_PUT, + [[{ + "log_format": { + "host": "$host", + "@timestamp": "$time_iso8601", + "client_ip": "$remote_addr" + } + }]], + [[{ + "node": { + "value": { + "log_format": { + "host": "$host", + "@timestamp": "$time_iso8601", + "client_ip": "$remote_addr" + } + } + }, + "action": "set" + }]] + ) + + ngx.status = code + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 2: set route(id: 1), batch_max_size=1 +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "key" : "key1", + "tag" : "tag1", + "timeout" : 1, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 3: hit route and report rocketmq logger +--- request +GET /hello +--- response_body +hello world +--- wait: 0.5 +--- no_error_log +[error] +--- error_log eval +qr/send data to rocketmq: \{.*"host":"localhost"/ diff --git a/t/plugin/rocketmq-logger.t b/t/plugin/rocketmq-logger.t new file mode 100644 index 000000000000..6ca262495fee --- /dev/null +++ b/t/plugin/rocketmq-logger.t @@ -0,0 +1,1212 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_root_location(); +run_tests; + +__DATA__ + +=== TEST 1: sanity +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.rocketmq-logger") + local ok, err = plugin.check_schema({ + rocketmq_topic = "test", + key = "key1", + nameserver_list = { + ["127.0.0.1"] = 3 + } + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- request +GET /t +--- response_body +done +--- no_error_log +[error] + + + +=== TEST 2: missing nameserver list +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.rocketmq-logger") + local ok, err = plugin.check_schema({rocketmq_topic = "test", key= "key1"}) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- request +GET /t +--- response_body +property "nameserver_list" is required +done +--- no_error_log +[error] + + + +=== TEST 3: wrong type of string +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.rocketmq-logger") + local ok, err = plugin.check_schema({ + nameserver_list = { + ["127.0.0.1"] = 3000 + }, + timeout = "10", + rocketmq_topic ="test", + key= "key1" + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- request +GET /t +--- response_body +property "timeout" validation failed: wrong type: expected integer, got string +done +--- no_error_log +[error] + + + +=== TEST 4: set route(id: 1) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]], + [[{ + "node": { + "value": { + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }, + "key": "/apisix/routes/1" + }, + "action": "set" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 5: access +--- request +GET /hello +--- response_body +hello world +--- no_error_log +[error] +--- wait: 2 + + + +=== TEST 6: error log +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.1":9876, + "127.0.0.1":9877 + }, + "rocketmq_topic" : "test2", + "producer_type": "sync", + "key" : "key1", + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]], + [[{ + "node": { + "value": { + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.1":9876, + "127.0.0.1":9877 + }, + "rocketmq_topic" : "test2", + "producer_type": "sync", + "key" : "key1", + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }, + "key": "/apisix/routes/1" + }, + "action": "set" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + local http = require "resty.http" + local httpc = http.new() + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local res, err = httpc:request_uri(uri, {method = "GET"}) + } + } +--- request +GET /t +--- error_log +failed to send data to rocketmq topic +[error] +--- wait: 1 + + + +=== TEST 7: set route(meta_format = origin, include_req_body = true) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": true, + "meta_format": "origin" + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 8: hit route, report log to rocketmq +--- request +GET /hello?ab=cd +abcdef +--- response_body +hello world +--- no_error_log +[error] +--- error_log +send data to rocketmq: GET /hello?ab=cd HTTP/1.1 +host: localhost +content-length: 6 +connection: close + +abcdef +--- wait: 2 + + + +=== TEST 9: set route(meta_format = origin, include_req_body = false) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false, + "meta_format": "origin" + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 10: hit route, report log to rocketmq +--- request +GET /hello?ab=cd +abcdef +--- response_body +hello world +--- no_error_log +[error] +--- error_log +send data to rocketmq: GET /hello?ab=cd HTTP/1.1 +host: localhost +content-length: 6 +connection: close +--- wait: 2 + + + +=== TEST 11: set route(meta_format = default) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 12: hit route, report log to rocketmq +--- request +GET /hello?ab=cd +abcdef +--- response_body +hello world +--- no_error_log +[error] +--- error_log_like eval +qr/send data to rocketmq: \{.*"upstream":"127.0.0.1:1980"/ +--- wait: 2 + + + +=== TEST 13: set route(id: 1), missing key field +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "timeout" : 1, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]], + [[{ + "node": { + "value": { + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "timeout" : 1, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }, + "key": "/apisix/routes/1" + }, + "action": "set" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 14: access, test key field is optional +--- request +GET /hello +--- response_body +hello world +--- no_error_log +[error] +--- wait: 2 + + + +=== TEST 15: set route(meta_format = default), missing key field +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 16: hit route, report log to rocketmq +--- request +GET /hello?ab=cd +abcdef +--- response_body +hello world +--- no_error_log +[error] +--- error_log_like eval +qr/send data to rocketmq: \{.*"upstream":"127.0.0.1:1980"/ +--- wait: 2 + + + +=== TEST 17: use the topic with 3 partitions +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : { + "127.0.0.1": 9876 + }, + "rocketmq_topic" : "test3", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 18: report log to rocketmq by different partitions +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : { + "127.0.0.1": 9876 + }, + "rocketmq_topic" : "test3", + "producer_type": "sync", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + } + } +--- request +GET /t +--- timeout: 5s +--- ignore_response +--- no_error_log +[error] +--- error_log eval +[qr/queue: 1/, +qr/queue: 0/, +qr/queue: 2/] + + + +=== TEST 19: report log to rocketmq by different partitions in async mode +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : { + "127.0.0.1": 9876 + }, + "rocketmq_topic" : "test3", + "producer_type": "async", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + } + } +--- request +GET /t +--- timeout: 5s +--- ignore_response +--- no_error_log +[error] +--- error_log eval +[qr/queue: 1/, +qr/queue: 0/, +qr/queue: 2/] + + + +=== TEST 20: update the nameserver_list, generate different rocketmq producers +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + ngx.sleep(0.5) + + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + + code, body = t('/apisix/admin/routes/1/plugins', + ngx.HTTP_PATCH, + [[{ + "rocketmq-logger": { + "nameserver_list" : { + "127.0.0.1": 9876 + }, + "rocketmq_topic" : "test2", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }]] + ) + + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + + code, body = t('/apisix/admin/routes/1/plugins', + ngx.HTTP_PATCH, + [[{ + "rocketmq-logger": { + "nameserver_list" : { + "127.0.0.1": 19876 + }, + "rocketmq_topic" : "test4", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }]] + ) + + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + + ngx.sleep(2) + ngx.say("passed") + } + } +--- request +GET /t +--- timeout: 10 +--- response +passed +--- wait: 5 +--- error_log +phase_func(): rocketmq nameserver_list[1] port 9876 +phase_func(): rocketmq nameserver_list[1] port 19876 +--- no_error_log eval +qr/not found topic/ + + + +=== TEST 21: use the topic that does not exist on rocketmq(even if rocketmq allows auto create topics, first time push messages to rocketmq would got this error) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1/plugins', + ngx.HTTP_PATCH, + [[{ + "rocketmq-logger": { + "nameserver_list" : { + "127.0.0.1": 9876 + }, + "rocketmq_topic" : "undefined_topic", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }]] + ) + + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + + ngx.sleep(2) + ngx.say("passed") + } + } +--- request +GET /t +--- timeout: 5 +--- response +passed +--- error_log eval +qr/getTopicRouteInfoFromNameserver return TOPIC_NOT_EXIST, No topic route info in name server for the topic: undefined_topic/ + + + +=== TEST 22: check nameserver_list via schema +--- config + location /t { + content_by_lua_block { + local data = { + { + input = { + nameserver_list = {}, + rocketmq_topic = "test", + key= "key1", + }, + }, + { + input = { + nameserver_list = { + ["127.0.0.1"] = "9876" + }, + rocketmq_topic = "test", + key= "key1", + }, + }, + { + input = { + nameserver_list = { + ["127.0.0.1"] = 0 + }, + rocketmq_topic = "test", + key= "key1", + }, + }, + { + input = { + nameserver_list = { + ["127.0.0.1"] = 65536 + }, + rocketmq_topic = "test", + key= "key1", + }, + }, + } + + local plugin = require("apisix.plugins.rocketmq-logger") + + local err_count = 0 + for i in ipairs(data) do + local ok, err = plugin.check_schema(data[i].input) + if not ok then + err_count = err_count + 1 + ngx.say(err) + end + end + + assert(err_count == #data) + } + } +--- request +GET /t +--- response_body +property "nameserver_list" validation failed: expect object to have at least 1 properties +property "nameserver_list" validation failed: failed to validate 127.0.0.1 (matching ".*"): wrong type: expected integer, got string +property "nameserver_list" validation failed: failed to validate 127.0.0.1 (matching ".*"): expected 0 to be greater than 1 +property "nameserver_list" validation failed: failed to validate 127.0.0.1 (matching ".*"): expected 65536 to be smaller than 65535 +--- no_error_log +[error] + + + +=== TEST 23: rocketmq nameserver list info in log +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.127":9876 + }, + "rocketmq_topic" : "test2", + "producer_type": "sync", + "key" : "key1", + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + local http = require "resty.http" + local httpc = http.new() + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local res, err = httpc:request_uri(uri, {method = "GET"}) + } + } +--- request +GET /t +--- error_log_like eval +qr/create new rocketmq producer instance, nameserver_list: \[\{"port":9876,"host":"127.0.0.127"}]/ +qr/failed to send data to rocketmq topic: .*, nameserver_list: \{"127.0.0.127":9876}/ + + +=== TEST 24: delete plugin metadata, tests would fail if run rocketmq-logger-log-format.t and plugin metadata is added +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/plugin_metadata/rocketmq-logger', + ngx.HTTP_DELETE, + nil, + [[{"action": "delete"}]]) + } + } +--- request +GET /t +--- response_body + +--- no_error_log +[error] + + +=== TEST 25: set route(id: 1,include_req_body = true,include_req_body_expr = array) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [=[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "include_req_body": true, + "include_req_body_expr": [ + [ + "arg_name", + "==", + "qwerty" + ] + ], + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]=] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } + +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 26: hit route, expr eval success +--- request +POST /hello?name=qwerty +abcdef +--- response_body +hello world +--- no_error_log +[error] +--- error_log eval +qr/send data to rocketmq: \{.*"body":"abcdef"/ +--- wait: 2 + + + +=== TEST 27: hit route,expr eval fail +--- request +POST /hello?name=zcxv +abcdef +--- response_body +hello world +--- no_error_log eval +qr/send data to rocketmq: \{.*"body":"abcdef"/ +--- wait: 2 + + + +=== TEST 28: check log schema(include_req_body) +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.rocketmq-logger") + local ok, err = plugin.check_schema({ + rocketmq_topic = "test", + key = "key1", + nameserver_list = { + ["127.0.0.1"] = 3 + }, + include_req_body = true, + include_req_body_expr = { + {"bar", "<>", "foo"} + } + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- request +GET /t +--- response_body +failed to validate the 'include_req_body_expr' expression: invalid operator '<>' +done +--- no_error_log +[error] + + + +=== TEST 29: check log schema(include_resp_body) +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.rocketmq-logger") + local ok, err = plugin.check_schema({ + rocketmq_topic = "test", + key = "key1", + nameserver_list = { + ["127.0.0.1"] = 3 + }, + include_resp_body = true, + include_resp_body_expr = { + {"bar", "", "foo"} + } + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- request +GET /t +--- response_body +failed to validate the 'include_resp_body_expr' expression: invalid operator '' +done +--- no_error_log +[error] + + + +=== TEST 30: set route(id: 1,include_resp_body = true,include_resp_body_expr = array) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [=[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "include_resp_body": true, + "include_resp_body_expr": [ + [ + "arg_name", + "==", + "qwerty" + ] + ], + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]=] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } + +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 31: hit route, expr eval success +--- request +POST /hello?name=qwerty +abcdef +--- response_body +hello world +--- no_error_log +[error] +--- error_log eval +qr/send data to rocketmq: \{.*"body":"hello world\\n"/ +--- wait: 2 + + + +=== TEST 32: hit route,expr eval fail +--- request +POST /hello?name=zcxv +abcdef +--- response_body +hello world +--- no_error_log eval +qr/send data to rocketmq: \{.*"body":"hello world\\n"/ +--- wait: 2