From 48dd13a1f4517cf77bf966d14fc6683702eadcc4 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Tue, 30 Nov 2021 02:02:33 +0800 Subject: [PATCH 01/23] feat: rocketmq logger --- README.md | 2 +- apisix/plugins/rocketmq-logger.lua | 271 +++++ conf/config-default.yaml | 1 + docs/en/latest/config.json | 1 + docs/en/latest/plugins/rocketmq-logger.md | 237 ++++ docs/zh/latest/README.md | 2 +- docs/zh/latest/config.json | 1 + docs/zh/latest/plugins/rocketmq-logger.md | 232 ++++ rockspec/apisix-master-0.rockspec | 1 + t/admin/plugins.t | 1 + t/debug/debug-mode.t | 1 + t/plugin/rocketmq-logger-log-format.t | 124 +++ t/plugin/rocketmq-logger.t | 1214 +++++++++++++++++++++ 13 files changed, 2086 insertions(+), 2 deletions(-) create mode 100644 apisix/plugins/rocketmq-logger.lua create mode 100644 docs/en/latest/plugins/rocketmq-logger.md create mode 100644 docs/zh/latest/plugins/rocketmq-logger.md create mode 100644 t/plugin/rocketmq-logger-log-format.t create mode 100644 t/plugin/rocketmq-logger.t diff --git a/README.md b/README.md index abce5e2a5733..f686a34171d0 100644 --- a/README.md +++ b/README.md @@ -134,7 +134,7 @@ A/B testing, canary release, blue-green deployment, limit rate, defense against - High performance: The single-core QPS reaches 18k with an average delay of fewer than 0.2 milliseconds. - [Fault Injection](docs/en/latest/plugins/fault-injection.md) - [REST Admin API](docs/en/latest/admin-api.md): Using the REST Admin API to control Apache APISIX, which only allows 127.0.0.1 access by default, you can modify the `allow_admin` field in `conf/config.yaml` to specify a list of IPs that are allowed to call the Admin API. Also, note that the Admin API uses key auth to verify the identity of the caller. **The `admin_key` field in `conf/config.yaml` needs to be modified before deployment to ensure security**. - - External Loggers: Export access logs to external log management tools. ([HTTP Logger](docs/en/latest/plugins/http-logger.md), [TCP Logger](docs/en/latest/plugins/tcp-logger.md), [Kafka Logger](docs/en/latest/plugins/kafka-logger.md), [UDP Logger](docs/en/latest/plugins/udp-logger.md), [Google Cloud Logging](docs/en/latest/plugins/google-cloud-logging.md)) + - External Loggers: Export access logs to external log management tools. ([HTTP Logger](docs/en/latest/plugins/http-logger.md), [TCP Logger](docs/en/latest/plugins/tcp-logger.md), [Kafka Logger](docs/en/latest/plugins/kafka-logger.md), [UDP Logger](docs/en/latest/plugins/udp-logger.md), [Google Cloud Logging](docs/en/latest/plugins/google-cloud-logging.md), [RocketMQ Logger](docs/en/latest/plugins/rocketmq-logger.md)) - [Datadog](docs/en/latest/plugins/datadog.md): push custom metrics to the DogStatsD server, comes bundled with [Datadog agent](https://docs.datadoghq.com/agent/), over the UDP protocol. DogStatsD basically is an implementation of StatsD protocol which collects the custom metrics for Apache APISIX agent, aggregates it into a single data point and sends it to the configured Datadog server. - [Helm charts](https://github.com/apache/apisix-helm-chart) diff --git a/apisix/plugins/rocketmq-logger.lua b/apisix/plugins/rocketmq-logger.lua new file mode 100644 index 000000000000..662caea2a798 --- /dev/null +++ b/apisix/plugins/rocketmq-logger.lua @@ -0,0 +1,271 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local core = require("apisix.core") +local log_util = require("apisix.utils.log-util") +local producer = require ("resty.rocketmq.producer") +local acl_rpchook = require("resty.rocketmq.acl_rpchook") +local batch_processor = require("apisix.utils.batch-processor") +local plugin = require("apisix.plugin") + +local pairs = pairs +local type = type +local ipairs = ipairs +local plugin_name = "rocketmq-logger" +local stale_timer_running = false +local timer_at = ngx.timer.at +local ngx = ngx +local buffers = {} + +local lrucache = core.lrucache.new({ + type = "plugin", +}) + +local schema = { + type = "object", + properties = { + meta_format = { + type = "string", + default = "default", + enum = {"default", "origin"}, + }, + nameserver_list = { + type = "object", + minProperties = 1, + patternProperties = { + [".*"] = { + description = "the port of rocketmq nameserver", + type = "integer", + minimum = 1, + maximum = 65535, + }, + }, + }, + rocketmq_topic = {type = "string"}, + key = {type = "string"}, + tag = {type = "string"}, + timeout = {type = "integer", minimum = 1, default = 3}, + use_tls = {type = "boolean", default = false}, + access_key = {type = "string", default = ""}, + secret_key = {type = "string", default = ""}, + name = {type = "string", default = "rocketmq logger"}, + max_retry_count = {type = "integer", minimum = 0, default = 0}, + retry_delay = {type = "integer", minimum = 0, default = 1}, + buffer_duration = {type = "integer", minimum = 1, default = 60}, + inactive_timeout = {type = "integer", minimum = 1, default = 5}, + include_req_body = {type = "boolean", default = false}, + include_req_body_expr = { + type = "array", + minItems = 1, + items = { + type = "array", + items = { + type = "string" + } + } + }, + include_resp_body = {type = "boolean", default = false}, + include_resp_body_expr = { + type = "array", + minItems = 1, + items = { + type = "array", + items = { + type = "string" + } + } + }, + }, + required = {"nameserver_list", "rocketmq_topic"} +} + +local metadata_schema = { + type = "object", + properties = { + log_format = log_util.metadata_schema_log_format, + }, +} + +local _M = { + version = 0.1, + priority = 402, + name = plugin_name, + schema = schema, + metadata_schema = metadata_schema, +} + + +function _M.check_schema(conf, schema_type) + if schema_type == core.schema.TYPE_METADATA then + return core.schema.check(metadata_schema, conf) + end + + local ok, err = core.schema.check(schema, conf) + if not ok then + return nil, err + end + return log_util.check_log_schema(conf) +end + + +-- remove stale objects from the memory after timer expires +local function remove_stale_objects(premature) + if premature then + return + end + + for key, batch in ipairs(buffers) do + if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then + core.log.warn("removing batch processor stale object, conf: ", + core.json.delay_encode(key)) + buffers[key] = nil + end + end + + stale_timer_running = false +end + + +local function create_producer(nameserver_list, producer_config) + core.log.info("create new rocketmq producer instance") + local prod = producer.new(nameserver_list, "apisixLogProducer") + if producer_config.use_tls then + prod:setUseTLS(true) + end + if producer_config.access_key ~= '' then + local aclHook = acl_rpchook.new(producer_config.access_key, producer_config.secret_key) + prod:addRPCHook(aclHook) + end + prod:setTimeout(producer_config.timeout) + return prod +end + + +local function send_rocketmq_data(conf, log_message, prod) + local result, err = prod:send(conf.rocketmq_topic, log_message, conf.tag, conf.key) + if not result then + return false, "failed to send data to rocketmq topic: " .. err .. + ", nameserver_list: " .. core.json.encode(conf.nameserver_list) + end + + core.log.info("queue: ", result.sendResult.messageQueue.queueId) + + return true +end + + +function _M.body_filter(conf, ctx) + log_util.collect_body(conf, ctx) +end + + +function _M.log(conf, ctx) + local entry + if conf.meta_format == "origin" then + entry = log_util.get_req_original(ctx, conf) + -- core.log.info("origin entry: ", entry) + + else + local metadata = plugin.plugin_metadata(plugin_name) + core.log.info("metadata: ", core.json.delay_encode(metadata)) + if metadata and metadata.value.log_format + and core.table.nkeys(metadata.value.log_format) > 0 + then + entry = log_util.get_custom_format_log(ctx, metadata.value.log_format) + core.log.info("custom log format entry: ", core.json.delay_encode(entry)) + else + entry = log_util.get_full_log(ngx, conf) + core.log.info("full log entry: ", core.json.delay_encode(entry)) + end + end + + if not stale_timer_running then + -- run the timer every 30 mins if any log is present + timer_at(1800, remove_stale_objects) + stale_timer_running = true + end + + local log_buffer = buffers[conf] + if log_buffer then + log_buffer:push(entry) + return + end + + -- reuse producer via lrucache to avoid unbalanced partitions of messages in rocketmq + local nameserver_list = core.table.new(core.table.nkeys(conf.nameserver_list), 0) + + for host, port in pairs(conf.nameserver_list) do + local nameserver = host .. ':' .. port + core.table.insert(nameserver_list, nameserver) + end + + local producer_config = { + timeout = conf.timeout * 1000, + use_tls = conf.use_tls, + access_key = conf.access_key, + secret_key = conf.secret_key, + } + + local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, create_producer, + nameserver_list, producer_config) + if err then + return nil, "failed to create the rocketmq producer: " .. err + end + core.log.info("rocketmq nameserver_list[1] port ", + prod.client.nameservers[1].port) + -- Generate a function to be executed by the batch processor + local func = function(entries, batch_max_size) + local data, err + if batch_max_size == 1 then + data = entries[1] + if type(data) ~= "string" then + data, err = core.json.encode(data) -- encode as single {} + end + else + data, err = core.json.encode(entries) -- encode as array [{}] + end + + if not data then + return false, 'error occurred while encoding the data: ' .. err + end + + core.log.info("send data to rocketmq: ", data) + return send_rocketmq_data(conf, data, prod) + end + + local config = { + name = conf.name, + retry_delay = conf.retry_delay, + batch_max_size = conf.batch_max_size, + max_retry_count = conf.max_retry_count, + buffer_duration = conf.buffer_duration, + inactive_timeout = conf.inactive_timeout, + } + + local err + log_buffer, err = batch_processor:new(func, config) + + if not log_buffer then + core.log.error("error when creating the batch processor: ", err) + return + end + + buffers[conf] = log_buffer + log_buffer:push(entry) +end + + +return _M diff --git a/conf/config-default.yaml b/conf/config-default.yaml index dc42e04bea5c..5c77dbc9f531 100644 --- a/conf/config-default.yaml +++ b/conf/config-default.yaml @@ -349,6 +349,7 @@ plugins: # plugin list (sorted by priority) - sls-logger # priority: 406 - tcp-logger # priority: 405 - kafka-logger # priority: 403 + - rocketmq-logger # priority: 402 - syslog # priority: 401 - udp-logger # priority: 400 #- log-rotate # priority: 100 diff --git a/docs/en/latest/config.json b/docs/en/latest/config.json index d40a0ded540d..04c90171de10 100644 --- a/docs/en/latest/config.json +++ b/docs/en/latest/config.json @@ -116,6 +116,7 @@ "plugins/skywalking-logger", "plugins/tcp-logger", "plugins/kafka-logger", + "plugins/rocketmq-logger", "plugins/udp-logger", "plugins/syslog", "plugins/log-rotate", diff --git a/docs/en/latest/plugins/rocketmq-logger.md b/docs/en/latest/plugins/rocketmq-logger.md new file mode 100644 index 000000000000..3f660b4b1fa7 --- /dev/null +++ b/docs/en/latest/plugins/rocketmq-logger.md @@ -0,0 +1,237 @@ +--- +title: rocketmq-logger +--- + + + +## Summary + +- [**Name**](#name) +- [**Attributes**](#attributes) +- [**Info**](#info) +- [**How To Enable**](#how-to-enable) +- [**Test Plugin**](#test-plugin) +- [**Disable Plugin**](#disable-plugin) + +## Name + +`rocketmq-logger` is a plugin which works as a rocketmq client driver for the ngx_lua nginx module. + +This plugin provides the ability to push requests log data as JSON objects to your external rocketmq clusters. In case if you did not receive the log data don't worry give it some time it will automatically send the logs after the timer function expires in our Batch Processor. + +For more info on Batch-Processor in Apache APISIX please refer. +[Batch-Processor](../batch-processor.md) + +## Attributes + +| Name | Type | Requirement | Default | Valid | Description | +| ---------------- | ------- | ----------- | -------------- | ------- | ---------------------------------------------------------------------------------------- | +| nameserver_list | object | required | | | An array of rocketmq nameservers. | +| rocketmq_topic | string | required | | | Target topic to push data. | +| key | string | optional | | | Keys of messages to send. | +| tag | string | optional | | | Tags of messages to send. | +| timeout | integer | optional | 3 | [1,...] | Timeout for the upstream to send data. | +| use_tls | boolean | optional | false | | Whether to open TLS | +| access_key | string | optional | "" | | access key for ACL, empty string means disable ACL. | +| secret_key | string | optional | "" | | secret key for ACL。 | +| name | string | optional | "rocketmq logger" | | A unique identifier to identity the batch processor. | +| meta_format | enum | optional | "default" | ["default","origin"] | `default`: collect the request information with default JSON way. `origin`: collect the request information with original HTTP request. [example](#examples-of-meta_format)| +| batch_max_size | integer | optional | 1000 | [1,...] | Set the maximum number of logs sent in each batch. When the number of logs reaches the set maximum, all logs will be automatically pushed to the `rocketmq` service. | +| inactive_timeout | integer | optional | 5 | [1,...] | The maximum time to refresh the buffer (in seconds). When the maximum refresh time is reached, all logs will be automatically pushed to the `rocketmq` service regardless of whether the number of logs in the buffer reaches the set maximum number. | +| buffer_duration | integer | optional | 60 | [1,...] | Maximum age in seconds of the oldest entry in a batch before the batch must be processed.| +| max_retry_count | integer | optional | 0 | [0,...] | Maximum number of retries before removing from the processing pipe line. | +| retry_delay | integer | optional | 1 | [0,...] | Number of seconds the process execution should be delayed if the execution fails. | +| include_req_body | boolean | optional | false | [false, true] | Whether to include the request body. false: indicates that the requested body is not included; true: indicates that the requested body is included. Note: if the request body is too big to be kept in the memory, it can't be logged due to Nginx's limitation. | +| include_req_body_expr | array | optional | | | When `include_req_body` is true, control the behavior based on the result of the [lua-resty-expr](https://github.com/api7/lua-resty-expr) expression. If present, only log the request body when the result is true. | +| include_resp_body| boolean | optional | false | [false, true] | Whether to include the response body. The response body is included if and only if it is `true`. | +| include_resp_body_expr | array | optional | | | When `include_resp_body` is true, control the behavior based on the result of the [lua-resty-expr](https://github.com/api7/lua-resty-expr) expression. If present, only log the response body when the result is true. | + +### examples of meta_format + +- **default**: + + ```json + { + "upstream": "127.0.0.1:1980", + "start_time": 1619414294760, + "client_ip": "127.0.0.1", + "service_id": "", + "route_id": "1", + "request": { + "querystring": { + "ab": "cd" + }, + "size": 90, + "uri": "/hello?ab=cd", + "url": "http://localhost:1984/hello?ab=cd", + "headers": { + "host": "localhost", + "content-length": "6", + "connection": "close" + }, + "body": "abcdef", + "method": "GET" + }, + "response": { + "headers": { + "connection": "close", + "content-type": "text/plain; charset=utf-8", + "date": "Mon, 26 Apr 2021 05:18:14 GMT", + "server": "APISIX/2.5", + "transfer-encoding": "chunked" + }, + "size": 190, + "status": 200 + }, + "server": { + "hostname": "localhost", + "version": "2.5" + }, + "latency": 0 + } + ``` + +- **origin**: + + ```http + GET /hello?ab=cd HTTP/1.1 + host: localhost + content-length: 6 + connection: close + + abcdef + ``` + +## Info + +The `message` will write to the buffer first. +It will send to the rocketmq server when the buffer exceed the `batch_max_size`, +or every `buffer_duration` flush the buffer. + +In case of success, returns `true`. +In case of errors, returns `nil` with a string describing the error (`buffer overflow`). + +### Sample Nameserver list + +Specify the nameservers of the external rocketmq servers as below sample. + +```json +{ + "127.0.0.1":9876, + "127.0.0.2":9876 +} +``` + +## How To Enable + +The following is an example on how to enable the rocketmq-logger for a specific route. + +```shell +curl http://127.0.0.1:9080/apisix/admin/routes/5 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "batch_max_size": 1, + "name": "rocketmq logger" + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" +}' +``` + +## Test Plugin + +success: + +```shell +$ curl -i http://127.0.0.1:9080/hello +HTTP/1.1 200 OK +... +hello, world +``` + +## Metadata + +| Name | Type | Requirement | Default | Valid | Description | +| ---------------- | ------- | ----------- | ------------- | ------- | ---------------------------------------------------------------------------------------- | +| log_format | object | optional | {"host": "$host", "@timestamp": "$time_iso8601", "client_ip": "$remote_addr"} | | Log format declared as key value pair in JSON format. Only string is supported in the `value` part. If the value starts with `$`, it means to get `APISIX` variables or [Nginx variable](http://nginx.org/en/docs/varindex.html). | + + Note that **the metadata configuration is applied in global scope**, which means it will take effect on all Route or Service which use rocketmq-logger plugin. + +**APISIX Variables** + +| Variable Name | Description | Usage Example | +|------------------|-------------------------|----------------| +| route_id | id of `route` | $route_id | +| route_name | name of `route` | $route_name | +| service_id | id of `service` | $service_id | +| service_name | name of `service` | $service_name | +| consumer_name | username of `consumer` | $consumer_name | + +### Example + +```shell +curl http://127.0.0.1:9080/apisix/admin/plugin_metadata/rocketmq-logger -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "log_format": { + "host": "$host", + "@timestamp": "$time_iso8601", + "client_ip": "$remote_addr" + } +}' +``` + +It is expected to see some logs like that: + +```shell +{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"} +{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"} +``` + +## Disable Plugin + +Remove the corresponding json configuration in the plugin configuration to disable the `rocketmq-logger`. +APISIX plugins are hot-reloaded, therefore no need to restart APISIX. + +```shell +$ curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "methods": ["GET"], + "uri": "/hello", + "plugins": {}, + "upstream": { + "type": "roundrobin", + "nodes": { + "127.0.0.1:1980": 1 + } + } +}' +``` diff --git a/docs/zh/latest/README.md b/docs/zh/latest/README.md index c8a3a654cafa..d46eb94a851b 100644 --- a/docs/zh/latest/README.md +++ b/docs/zh/latest/README.md @@ -135,7 +135,7 @@ A/B 测试、金丝雀发布(灰度发布)、蓝绿部署、限流限速、抵 - 高性能:在单核上 QPS 可以达到 18k,同时延迟只有 0.2 毫秒。 - [故障注入](plugins/fault-injection.md) - [REST Admin API](admin-api.md): 使用 REST Admin API 来控制 Apache APISIX,默认只允许 127.0.0.1 访问,你可以修改 `conf/config.yaml` 中的 `allow_admin` 字段,指定允许调用 Admin API 的 IP 列表。同时需要注意的是,Admin API 使用 key auth 来校验调用者身份,**在部署前需要修改 `conf/config.yaml` 中的 `admin_key` 字段,来保证安全。** - - 外部日志记录器:将访问日志导出到外部日志管理工具。([HTTP Logger](plugins/http-logger.md), [TCP Logger](plugins/tcp-logger.md), [Kafka Logger](plugins/kafka-logger.md), [UDP Logger](plugins/udp-logger.md), [Google Cloud Logging](plugins/google-cloud-logging.md)) + - 外部日志记录器:将访问日志导出到外部日志管理工具。([HTTP Logger](plugins/http-logger.md), [TCP Logger](plugins/tcp-logger.md), [Kafka Logger](plugins/kafka-logger.md), [UDP Logger](plugins/udp-logger.md), [Google Cloud Logging](plugins/google-cloud-logging.md), [RocketMQ Logger](plugins/rocketmq-logger.md)) - [Helm charts](https://github.com/apache/apisix-helm-chart) - **高度可扩展** diff --git a/docs/zh/latest/config.json b/docs/zh/latest/config.json index 9a2eef66dd11..cd914b7a4700 100644 --- a/docs/zh/latest/config.json +++ b/docs/zh/latest/config.json @@ -114,6 +114,7 @@ "plugins/skywalking-logger", "plugins/tcp-logger", "plugins/kafka-logger", + "plugins/rocketmq-logger", "plugins/udp-logger", "plugins/syslog", "plugins/log-rotate", diff --git a/docs/zh/latest/plugins/rocketmq-logger.md b/docs/zh/latest/plugins/rocketmq-logger.md new file mode 100644 index 000000000000..389813d29c88 --- /dev/null +++ b/docs/zh/latest/plugins/rocketmq-logger.md @@ -0,0 +1,232 @@ +--- +title: rocketmq-logger +--- + + + +## 目录 + +- [**简介**](#简介) +- [**属性**](#属性) +- [**工作原理**](#工作原理) +- [**如何启用**](#如何启用) +- [**测试插件**](#测试插件) +- [**禁用插件**](#禁用插件) + +## 简介 + +`rocketmq-logger` 是一个插件,可用作ngx_lua nginx 模块的 rocketmq 客户端驱动程序。 + +它可以将接口请求日志以 JSON 的形式推送给外部 rocketmq 集群。如果在短时间内没有收到日志数据,请放心,它会在我们的批处理处理器中的计时器功能到期后自动发送日志。 + +有关 Apache APISIX 中 Batch-Processor 的更多信息,请参考。 +[Batch-Processor](../batch-processor.md) + +## 属性 + +| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述 | +| ---------------- | ------- | ------ | -------------- | ------- | ------------------------------------------------ | +| nameserver_list | object | 必须 | | | 要推送的 rocketmq 的 nameserver 列表。 | +| rocketmq_topic | string | 必须 | | | 要推送的 topic。 | +| key | string | 可选 | | | 发送消息的keys。 | +| tag | string | 可选 | | | 发送消息的tags。 | +| timeout | integer | 可选 | 3 | [1,...] | 发送数据的超时时间。 | +| use_tls | boolean | 可选 | false | | 是否开启TLS加密。 | +| access_key | string | 可选 | "" | | ACL认证的access key,空字符串表示不开启ACL。 | +| secret_key | string | 可选 | "" | | ACL认证的secret key。 | +| name | string | 可选 | "rocketmq logger" | | batch processor 的唯一标识。 | +| meta_format | enum | 可选 | "default" | ["default","origin"] | `default`:获取请求信息以默认的 JSON 编码方式。`origin`:获取请求信息以 HTTP 原始请求方式。[具体示例](#meta_format-参考示例)| +| batch_max_size | integer | 可选 | 1000 | [1,...] | 设置每批发送日志的最大条数,当日志条数达到设置的最大值时,会自动推送全部日志到 `rocketmq` 服务。| +| inactive_timeout | integer | 可选 | 5 | [1,...] | 刷新缓冲区的最大时间(以秒为单位),当达到最大的刷新时间时,无论缓冲区中的日志数量是否达到设置的最大条数,也会自动将全部日志推送到 `rocketmq` 服务。 | +| buffer_duration | integer | 可选 | 60 | [1,...] | 必须先处理批次中最旧条目的最长期限(以秒为单位)。 | +| max_retry_count | integer | 可选 | 0 | [0,...] | 从处理管道中移除之前的最大重试次数。 | +| retry_delay | integer | 可选 | 1 | [0,...] | 如果执行失败,则应延迟执行流程的秒数。 | +| include_req_body | boolean | 可选 | false | [false, true] | 是否包括请求 body。false: 表示不包含请求的 body ;true: 表示包含请求的 body。注意:如果请求 body 没办法完全放在内存中,由于 Nginx 的限制,我们没有办法把它记录下来。| +| include_req_body_expr | array | 可选 | | | 当 `include_req_body` 开启时, 基于 [lua-resty-expr](https://github.com/api7/lua-resty-expr) 表达式的结果进行记录。如果该选项存在,只有在表达式为真的时候才会记录请求 body。 | +| include_resp_body| boolean | 可选 | false | [false, true] | 是否包括响应体。包含响应体,当为`true`。 | +| include_resp_body_expr | array | 可选 | | | 是否采集响体, 基于[lua-resty-expr](https://github.com/api7/lua-resty-expr)。 该选项需要开启 `include_resp_body`| + +### meta_format 参考示例 + +- **default**: + + ```json + { + "upstream": "127.0.0.1:1980", + "start_time": 1619414294760, + "client_ip": "127.0.0.1", + "service_id": "", + "route_id": "1", + "request": { + "querystring": { + "ab": "cd" + }, + "size": 90, + "uri": "/hello?ab=cd", + "url": "http://localhost:1984/hello?ab=cd", + "headers": { + "host": "localhost", + "content-length": "6", + "connection": "close" + }, + "body": "abcdef", + "method": "GET" + }, + "response": { + "headers": { + "connection": "close", + "content-type": "text/plain; charset=utf-8", + "date": "Mon, 26 Apr 2021 05:18:14 GMT", + "server": "APISIX/2.5", + "transfer-encoding": "chunked" + }, + "size": 190, + "status": 200 + }, + "server": { + "hostname": "localhost", + "version": "2.5" + }, + "latency": 0 + } + ``` + +- **origin**: + + ```http + GET /hello?ab=cd HTTP/1.1 + host: localhost + content-length: 6 + connection: close + + abcdef + ``` + +## 工作原理 + +消息将首先写入缓冲区。 +当缓冲区超过 `batch_max_size` 时,它将发送到 rocketmq 服务器, +或每个 `buffer_duration` 刷新缓冲区。 + +如果成功,则返回 `true`。 +如果出现错误,则返回 `nil`,并带有描述错误的字符串(`buffer overflow`)。 + +### Nameserver 列表 + +配置多个nameserver地址如下: + +```json +{ + "127.0.0.1":9876, + "127.0.0.2":9876 +} +``` + +## 如何启用 + +1. 为特定路由启用 rocketmq-logger 插件。 + +```shell +curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" +}' +``` + +## 测试插件 + + 成功 + +```shell +$ curl -i http://127.0.0.1:9080/hello +HTTP/1.1 200 OK +... +hello, world +``` + +## 插件元数据设置 + +| 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述 | +| ---------------- | ------- | ------ | ------------- | ------- | ------------------------------------------------ | +| log_format | object | 可选 | {"host": "$host", "@timestamp": "$time_iso8601", "client_ip": "$remote_addr"} | | 以 JSON 格式的键值对来声明日志格式。对于值部分,仅支持字符串。如果是以 `$` 开头,则表明是要获取 __APISIX__ 变量或 [Nginx 内置变量](http://nginx.org/en/docs/varindex.html)。特别的,**该设置是全局生效的**,意味着指定 log_format 后,将对所有绑定 http-logger 的 Route 或 Service 生效。 | + +**APISIX 变量** + +| 变量名 | 描述 | 使用示例 | +|------------------|-------------------------|----------------| +| route_id | `route` 的 id | $route_id | +| route_name | `route` 的 name | $route_name | +| service_id | `service` 的 id | $service_id | +| service_name | `service` 的 name | $service_name | +| consumer_name | `consumer` 的 username | $consumer_name | + +### 设置日志格式示例 + +```shell +curl http://127.0.0.1:9080/apisix/admin/plugin_metadata/rocketmq-logger -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "log_format": { + "host": "$host", + "@timestamp": "$time_iso8601", + "client_ip": "$remote_addr" + } +}' +``` + +在日志收集处,将得到类似下面的日志: + +```shell +{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"} +{"host":"localhost","@timestamp":"2020-09-23T19:05:05-04:00","client_ip":"127.0.0.1","route_id":"1"} +``` + +## 禁用插件 + +当您要禁用 `rocketmq-logger` 插件时,这很简单,您可以在插件配置中删除相应的 json 配置,无需重新启动服务,它将立即生效: + +```shell +$ curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d ' +{ + "methods": ["GET"], + "uri": "/hello", + "plugins": {}, + "upstream": { + "type": "roundrobin", + "nodes": { + "127.0.0.1:1980": 1 + } + } +}' +``` diff --git a/rockspec/apisix-master-0.rockspec b/rockspec/apisix-master-0.rockspec index f3e37f08c0bc..4b62943b371e 100644 --- a/rockspec/apisix-master-0.rockspec +++ b/rockspec/apisix-master-0.rockspec @@ -72,6 +72,7 @@ dependencies = { "api7-snowflake = 2.0-1", "inspect == 3.1.1", "lualdap = 1.2.6-1", + "lua-resty-rocketmq = 0.2.1-0", } build = { diff --git a/t/admin/plugins.t b/t/admin/plugins.t index 4305b66599bf..fe7489f7a805 100644 --- a/t/admin/plugins.t +++ b/t/admin/plugins.t @@ -106,6 +106,7 @@ google-cloud-logging sls-logger tcp-logger kafka-logger +rocketmq-logger syslog udp-logger example-plugin diff --git a/t/debug/debug-mode.t b/t/debug/debug-mode.t index 2f38dcbf5151..026569558546 100644 --- a/t/debug/debug-mode.t +++ b/t/debug/debug-mode.t @@ -81,6 +81,7 @@ loaded plugin and sort by priority: 410 name: http-logger loaded plugin and sort by priority: 406 name: sls-logger loaded plugin and sort by priority: 405 name: tcp-logger loaded plugin and sort by priority: 403 name: kafka-logger +loaded plugin and sort by priority: 402 name: rocketmq-logger loaded plugin and sort by priority: 401 name: syslog loaded plugin and sort by priority: 400 name: udp-logger loaded plugin and sort by priority: 0 name: example-plugin diff --git a/t/plugin/rocketmq-logger-log-format.t b/t/plugin/rocketmq-logger-log-format.t new file mode 100644 index 000000000000..1c4de982b21b --- /dev/null +++ b/t/plugin/rocketmq-logger-log-format.t @@ -0,0 +1,124 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +log_level('info'); +repeat_each(1); +no_long_string(); +no_root_location(); + +run_tests; + +__DATA__ + +=== TEST 1: add plugin metadata +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/plugin_metadata/rocketmq-logger', + ngx.HTTP_PUT, + [[{ + "log_format": { + "host": "$host", + "@timestamp": "$time_iso8601", + "client_ip": "$remote_addr" + } + }]], + [[{ + "node": { + "value": { + "log_format": { + "host": "$host", + "@timestamp": "$time_iso8601", + "client_ip": "$remote_addr" + } + } + }, + "action": "set" + }]] + ) + + ngx.status = code + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 2: set route(id: 1), batch_max_size=1 +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "key" : "key1", + "tag" : "tag1", + "timeout" : 1, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 3: hit route and report rocketmq logger +--- request +GET /hello +--- response_body +hello world +--- wait: 0.5 +--- no_error_log +[error] +--- error_log eval +qr/send data to rocketmq: \{.*"host":"localhost"/ diff --git a/t/plugin/rocketmq-logger.t b/t/plugin/rocketmq-logger.t new file mode 100644 index 000000000000..4ec554a763eb --- /dev/null +++ b/t/plugin/rocketmq-logger.t @@ -0,0 +1,1214 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +no_long_string(); +no_root_location(); +run_tests; + +__DATA__ + +=== TEST 1: sanity +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.rocketmq-logger") + local ok, err = plugin.check_schema({ + rocketmq_topic = "test", + key = "key1", + nameserver_list = { + ["127.0.0.1"] = 3 + } + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- request +GET /t +--- response_body +done +--- no_error_log +[error] + + + +=== TEST 2: missing nameserver list +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.rocketmq-logger") + local ok, err = plugin.check_schema({rocketmq_topic = "test", key= "key1"}) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- request +GET /t +--- response_body +property "nameserver_list" is required +done +--- no_error_log +[error] + + + +=== TEST 3: wrong type of string +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.rocketmq-logger") + local ok, err = plugin.check_schema({ + nameserver_list = { + ["127.0.0.1"] = 3000 + }, + timeout = "10", + rocketmq_topic ="test", + key= "key1" + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- request +GET /t +--- response_body +property "timeout" validation failed: wrong type: expected integer, got string +done +--- no_error_log +[error] + + + +=== TEST 4: set route(id: 1) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]], + [[{ + "node": { + "value": { + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }, + "key": "/apisix/routes/1" + }, + "action": "set" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 5: access +--- request +GET /hello +--- response_body +hello world +--- no_error_log +[error] +--- wait: 2 + + + +=== TEST 6: error log +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.1":9876, + "127.0.0.1":9877 + }, + "rocketmq_topic" : "test2", + "producer_type": "sync", + "key" : "key1", + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]], + [[{ + "node": { + "value": { + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.1":9876, + "127.0.0.1":9877 + }, + "rocketmq_topic" : "test2", + "producer_type": "sync", + "key" : "key1", + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }, + "key": "/apisix/routes/1" + }, + "action": "set" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + local http = require "resty.http" + local httpc = http.new() + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local res, err = httpc:request_uri(uri, {method = "GET"}) + } + } +--- request +GET /t +--- error_log +failed to send data to rocketmq topic +[error] +--- wait: 1 + + + +=== TEST 7: set route(meta_format = origin, include_req_body = true) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": true, + "meta_format": "origin" + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 8: hit route, report log to rocketmq +--- request +GET /hello?ab=cd +abcdef +--- response_body +hello world +--- no_error_log +[error] +--- error_log +send data to rocketmq: GET /hello?ab=cd HTTP/1.1 +host: localhost +content-length: 6 +connection: close + +abcdef +--- wait: 2 + + + +=== TEST 9: set route(meta_format = origin, include_req_body = false) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false, + "meta_format": "origin" + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 10: hit route, report log to rocketmq +--- request +GET /hello?ab=cd +abcdef +--- response_body +hello world +--- no_error_log +[error] +--- error_log +send data to rocketmq: GET /hello?ab=cd HTTP/1.1 +host: localhost +content-length: 6 +connection: close +--- wait: 2 + + + +=== TEST 11: set route(meta_format = default) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 12: hit route, report log to rocketmq +--- request +GET /hello?ab=cd +abcdef +--- response_body +hello world +--- no_error_log +[error] +--- error_log_like eval +qr/send data to rocketmq: \{.*"upstream":"127.0.0.1:1980"/ +--- wait: 2 + + + +=== TEST 13: set route(id: 1), missing key field +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "timeout" : 1, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]], + [[{ + "node": { + "value": { + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "timeout" : 1, + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }, + "key": "/apisix/routes/1" + }, + "action": "set" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 14: access, test key field is optional +--- request +GET /hello +--- response_body +hello world +--- no_error_log +[error] +--- wait: 2 + + + +=== TEST 15: set route(meta_format = default), missing key field +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 16: hit route, report log to rocketmq +--- request +GET /hello?ab=cd +abcdef +--- response_body +hello world +--- no_error_log +[error] +--- error_log_like eval +qr/send data to rocketmq: \{.*"upstream":"127.0.0.1:1980"/ +--- wait: 2 + + + +=== TEST 17: use the topic with 3 partitions +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : { + "127.0.0.1": 9876 + }, + "rocketmq_topic" : "test3", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 18: report log to rocketmq by different partitions +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : { + "127.0.0.1": 9876 + }, + "rocketmq_topic" : "test3", + "producer_type": "sync", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + } + } +--- request +GET /t +--- timeout: 5s +--- ignore_response +--- no_error_log +[error] +--- error_log eval +[qr/queue: 1/, +qr/queue: 0/, +qr/queue: 2/] + + + +=== TEST 19: report log to rocketmq by different partitions in async mode +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : { + "127.0.0.1": 9876 + }, + "rocketmq_topic" : "test3", + "producer_type": "async", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + } + } +--- request +GET /t +--- timeout: 5s +--- ignore_response +--- no_error_log +[error] +--- error_log eval +[qr/queue: 1/, +qr/queue: 0/, +qr/queue: 2/] + + + +=== TEST 20: update the nameserver_list, generate different rocketmq producers +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + ngx.sleep(0.5) + + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + + code, body = t('/apisix/admin/routes/1/plugins', + ngx.HTTP_PATCH, + [[{ + "rocketmq-logger": { + "nameserver_list" : { + "127.0.0.1": 9876 + }, + "rocketmq_topic" : "test2", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }]] + ) + + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + + code, body = t('/apisix/admin/routes/1/plugins', + ngx.HTTP_PATCH, + [[{ + "rocketmq-logger": { + "nameserver_list" : { + "127.0.0.1": 19876 + }, + "rocketmq_topic" : "test4", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }]] + ) + + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + + ngx.sleep(2) + ngx.say("passed") + } + } +--- request +GET /t +--- timeout: 10 +--- response +passed +--- wait: 5 +--- error_log +phase_func(): rocketmq nameserver_list[1] port 9876 +phase_func(): rocketmq nameserver_list[1] port 19876 +--- no_error_log eval +qr/not found topic/ + + + +=== TEST 21: use the topic that does not exist on rocketmq(even if rocketmq allows auto create topics, first time push messages to rocketmq would got this error) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1/plugins', + ngx.HTTP_PATCH, + [[{ + "rocketmq-logger": { + "nameserver_list" : { + "127.0.0.1": 9876 + }, + "rocketmq_topic" : "undefined_topic", + "timeout" : 1, + "batch_max_size": 1, + "include_req_body": false + } + }]] + ) + + if code >= 300 then + ngx.status = code + ngx.say("fail") + return + end + + t('/hello',ngx.HTTP_GET) + ngx.sleep(0.5) + + ngx.sleep(2) + ngx.say("passed") + } + } +--- request +GET /t +--- timeout: 5 +--- response +passed +--- error_log eval +qr/getTopicRouteInfoFromNameserver return TOPIC_NOT_EXIST, No topic route info in name server for the topic: undefined_topic/ + + + +=== TEST 22: check nameserver_list via schema +--- config + location /t { + content_by_lua_block { + local data = { + { + input = { + nameserver_list = {}, + rocketmq_topic = "test", + key= "key1", + }, + }, + { + input = { + nameserver_list = { + ["127.0.0.1"] = "9876" + }, + rocketmq_topic = "test", + key= "key1", + }, + }, + { + input = { + nameserver_list = { + ["127.0.0.1"] = 0 + }, + rocketmq_topic = "test", + key= "key1", + }, + }, + { + input = { + nameserver_list = { + ["127.0.0.1"] = 65536 + }, + rocketmq_topic = "test", + key= "key1", + }, + }, + } + + local plugin = require("apisix.plugins.rocketmq-logger") + + local err_count = 0 + for i in ipairs(data) do + local ok, err = plugin.check_schema(data[i].input) + if not ok then + err_count = err_count + 1 + ngx.say(err) + end + end + + assert(err_count == #data) + } + } +--- request +GET /t +--- response_body +property "nameserver_list" validation failed: expect object to have at least 1 properties +property "nameserver_list" validation failed: failed to validate 127.0.0.1 (matching ".*"): wrong type: expected integer, got string +property "nameserver_list" validation failed: failed to validate 127.0.0.1 (matching ".*"): expected 0 to be greater than 1 +property "nameserver_list" validation failed: failed to validate 127.0.0.1 (matching ".*"): expected 65536 to be smaller than 65535 +--- no_error_log +[error] + + + +=== TEST 23: rocketmq nameserver list info in log +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.127":9876 + }, + "rocketmq_topic" : "test2", + "producer_type": "sync", + "key" : "key1", + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + local http = require "resty.http" + local httpc = http.new() + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local res, err = httpc:request_uri(uri, {method = "GET"}) + } + } +--- request +GET /t +--- error_log_like eval +qr/create new rocketmq producer instance, nameserver_list: \[\{"port":9876,"host":"127.0.0.127"}]/ +qr/failed to send data to rocketmq topic: .*, nameserver_list: \{"127.0.0.127":9876}/ + + + +=== TEST 24: delete plugin metadata, tests would fail if run rocketmq-logger-log-format.t and plugin metadata is added +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/plugin_metadata/rocketmq-logger', + ngx.HTTP_DELETE, + nil, + [[{"action": "delete"}]]) + } + } +--- request +GET /t +--- response_body + +--- no_error_log +[error] + + + +=== TEST 25: set route(id: 1,include_req_body = true,include_req_body_expr = array) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [=[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "include_req_body": true, + "include_req_body_expr": [ + [ + "arg_name", + "==", + "qwerty" + ] + ], + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]=] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } + +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 26: hit route, expr eval success +--- request +POST /hello?name=qwerty +abcdef +--- response_body +hello world +--- no_error_log +[error] +--- error_log eval +qr/send data to rocketmq: \{.*"body":"abcdef"/ +--- wait: 2 + + + +=== TEST 27: hit route,expr eval fail +--- request +POST /hello?name=zcxv +abcdef +--- response_body +hello world +--- no_error_log eval +qr/send data to rocketmq: \{.*"body":"abcdef"/ +--- wait: 2 + + + +=== TEST 28: check log schema(include_req_body) +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.rocketmq-logger") + local ok, err = plugin.check_schema({ + rocketmq_topic = "test", + key = "key1", + nameserver_list = { + ["127.0.0.1"] = 3 + }, + include_req_body = true, + include_req_body_expr = { + {"bar", "<>", "foo"} + } + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- request +GET /t +--- response_body +failed to validate the 'include_req_body_expr' expression: invalid operator '<>' +done +--- no_error_log +[error] + + + +=== TEST 29: check log schema(include_resp_body) +--- config + location /t { + content_by_lua_block { + local plugin = require("apisix.plugins.rocketmq-logger") + local ok, err = plugin.check_schema({ + rocketmq_topic = "test", + key = "key1", + nameserver_list = { + ["127.0.0.1"] = 3 + }, + include_resp_body = true, + include_resp_body_expr = { + {"bar", "", "foo"} + } + }) + if not ok then + ngx.say(err) + end + ngx.say("done") + } + } +--- request +GET /t +--- response_body +failed to validate the 'include_resp_body_expr' expression: invalid operator '' +done +--- no_error_log +[error] + + + +=== TEST 30: set route(id: 1,include_resp_body = true,include_resp_body_expr = array) +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [=[{ + "plugins": { + "rocketmq-logger": { + "nameserver_list" : + { + "127.0.0.1":9876 + }, + "rocketmq_topic" : "test2", + "key" : "key1", + "timeout" : 1, + "include_resp_body": true, + "include_resp_body_expr": [ + [ + "arg_name", + "==", + "qwerty" + ] + ], + "batch_max_size": 1 + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]=] + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } + +--- request +GET /t +--- response_body +passed +--- no_error_log +[error] + + + +=== TEST 31: hit route, expr eval success +--- request +POST /hello?name=qwerty +abcdef +--- response_body +hello world +--- no_error_log +[error] +--- error_log eval +qr/send data to rocketmq: \{.*"body":"hello world\\n"/ +--- wait: 2 + + + +=== TEST 32: hit route,expr eval fail +--- request +POST /hello?name=zcxv +abcdef +--- response_body +hello world +--- no_error_log eval +qr/send data to rocketmq: \{.*"body":"hello world\\n"/ +--- wait: 2 From 773158e3142827414aabbae2f815e62ad3c7b1f2 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Wed, 1 Dec 2021 19:12:59 +0800 Subject: [PATCH 02/23] Update docker-compose.yml --- ci/pod/docker-compose.yml | 22 ++++++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/ci/pod/docker-compose.yml b/ci/pod/docker-compose.yml index c71ab63d6dc6..f7c56152cce1 100644 --- a/ci/pod/docker-compose.yml +++ b/ci/pod/docker-compose.yml @@ -355,6 +355,27 @@ services: networks: apisix_net: + rocketmq_namesrv: + image: apacherocketmq/rocketmq:4.9.2 + container_name: rmqnamesrv + restart: unless-stopped + ports: + - "9876:9876" + networks: + rocketmq_net: + + rocketmq_broker: + image: apacherocketmq/rocketmq:4.9.2 + container_name: rmqbroker + restart: unless-stopped + ports: + - "10909:10909" + - "10911:10911" + - "10912:10912" + depends_on: + - rocketmq_namesrv + networks: + rocketmq_net: networks: apisix_net: @@ -362,3 +383,4 @@ networks: kafka_net: nacos_net: skywalk_net: + rocketmq_net: From fafa6055131a92197fb1d65bba9095e4557f3169 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Wed, 1 Dec 2021 19:15:27 +0800 Subject: [PATCH 03/23] Update rocketmq-logger.md --- docs/en/latest/plugins/rocketmq-logger.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/latest/plugins/rocketmq-logger.md b/docs/en/latest/plugins/rocketmq-logger.md index 3f660b4b1fa7..3e4fb96202bc 100644 --- a/docs/en/latest/plugins/rocketmq-logger.md +++ b/docs/en/latest/plugins/rocketmq-logger.md @@ -32,7 +32,7 @@ title: rocketmq-logger ## Name -`rocketmq-logger` is a plugin which works as a rocketmq client driver for the ngx_lua nginx module. +`rocketmq-logger` is a plugin which provides the ability to push requests log data as JSON objects to your external rocketmq clusters. This plugin provides the ability to push requests log data as JSON objects to your external rocketmq clusters. In case if you did not receive the log data don't worry give it some time it will automatically send the logs after the timer function expires in our Batch Processor. From 37cd4bbe7a3cebe5ceafdb045067f030401af136 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Wed, 1 Dec 2021 19:18:21 +0800 Subject: [PATCH 04/23] Update rocketmq-logger.md --- docs/zh/latest/plugins/rocketmq-logger.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/latest/plugins/rocketmq-logger.md b/docs/zh/latest/plugins/rocketmq-logger.md index 389813d29c88..5516f63c5671 100644 --- a/docs/zh/latest/plugins/rocketmq-logger.md +++ b/docs/zh/latest/plugins/rocketmq-logger.md @@ -32,7 +32,7 @@ title: rocketmq-logger ## 简介 -`rocketmq-logger` 是一个插件,可用作ngx_lua nginx 模块的 rocketmq 客户端驱动程序。 +`rocketmq-logger` 插件利用ngx_lua客户端能力,可推送JSON格式的日志到外部rocketmq集群。 它可以将接口请求日志以 JSON 的形式推送给外部 rocketmq 集群。如果在短时间内没有收到日志数据,请放心,它会在我们的批处理处理器中的计时器功能到期后自动发送日志。 From 00352bb99b5d9ff9a13e45983f4091707e5cbddc Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Wed, 1 Dec 2021 19:24:15 +0800 Subject: [PATCH 05/23] Update docker-compose.yml --- ci/pod/docker-compose.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/ci/pod/docker-compose.yml b/ci/pod/docker-compose.yml index f7c56152cce1..2bc9a52f613c 100644 --- a/ci/pod/docker-compose.yml +++ b/ci/pod/docker-compose.yml @@ -356,7 +356,7 @@ services: apisix_net: rocketmq_namesrv: - image: apacherocketmq/rocketmq:4.9.2 + image: apacherocketmq/rocketmq:4.9.1 container_name: rmqnamesrv restart: unless-stopped ports: @@ -365,7 +365,7 @@ services: rocketmq_net: rocketmq_broker: - image: apacherocketmq/rocketmq:4.9.2 + image: apacherocketmq/rocketmq:4.9.1 container_name: rmqbroker restart: unless-stopped ports: From 6c3ac6fc8ea470624ac3985a02a345af52ac71cc Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Wed, 1 Dec 2021 20:11:27 +0800 Subject: [PATCH 06/23] docker img version --- ci/linux-ci-init-service.sh | 6 ++++++ ci/pod/docker-compose.yml | 12 +++++++----- docs/zh/latest/plugins/rocketmq-logger.md | 2 +- rockspec/apisix-master-0.rockspec | 2 +- 4 files changed, 15 insertions(+), 7 deletions(-) diff --git a/ci/linux-ci-init-service.sh b/ci/linux-ci-init-service.sh index 83144b7b3b30..997d2e143886 100755 --- a/ci/linux-ci-init-service.sh +++ b/ci/linux-ci-init-service.sh @@ -25,3 +25,9 @@ docker pull openwhisk/action-nodejs-v14:nightly docker run --rm -d --name openwhisk -p 3233:3233 -p 3232:3232 -v /var/run/docker.sock:/var/run/docker.sock openwhisk/standalone:nightly docker exec -i openwhisk waitready docker exec -i openwhisk bash -c "wsk action update test <(echo 'function main(args){return {\"hello\":args.name || \"test\"}}') --kind nodejs:14" + +docker exec -i rmqnamesrv rm /home/rocketmq/rocketmq-4.6.0/conf/tools.yml +docker exec -i rmqnamesrv /home/rocketmq/rocketmq-4.6.0/bin/mqadmin updateTopic -n namesrv:9876 -t test -c DefaultCluster +docker exec -i rmqnamesrv /home/rocketmq/rocketmq-4.6.0/bin/mqadmin updateTopic -n namesrv:9876 -t test2 -c DefaultCluster +docker exec -i rmqnamesrv /home/rocketmq/rocketmq-4.6.0/bin/mqadmin updateTopic -n namesrv:9876 -t test3 -c DefaultCluster +docker exec -i rmqnamesrv /home/rocketmq/rocketmq-4.6.0/bin/mqadmin updateTopic -n namesrv:9876 -t test4 -c DefaultCluster diff --git a/ci/pod/docker-compose.yml b/ci/pod/docker-compose.yml index 2bc9a52f613c..0fe3f0a3eb46 100644 --- a/ci/pod/docker-compose.yml +++ b/ci/pod/docker-compose.yml @@ -355,17 +355,18 @@ services: networks: apisix_net: - rocketmq_namesrv: - image: apacherocketmq/rocketmq:4.9.1 + namesrv: + image: apacherocketmq/rocketmq:4.6.0 container_name: rmqnamesrv restart: unless-stopped ports: - "9876:9876" + command: sh mqnamesrv networks: rocketmq_net: - rocketmq_broker: - image: apacherocketmq/rocketmq:4.9.1 + broker: + image: apacherocketmq/rocketmq:4.6.0 container_name: rmqbroker restart: unless-stopped ports: @@ -373,7 +374,8 @@ services: - "10911:10911" - "10912:10912" depends_on: - - rocketmq_namesrv + - namesrv + command: sh mqbroker -n namesrv:9876 -c ../conf/broker.conf networks: rocketmq_net: diff --git a/docs/zh/latest/plugins/rocketmq-logger.md b/docs/zh/latest/plugins/rocketmq-logger.md index 5516f63c5671..322bed833adf 100644 --- a/docs/zh/latest/plugins/rocketmq-logger.md +++ b/docs/zh/latest/plugins/rocketmq-logger.md @@ -32,7 +32,7 @@ title: rocketmq-logger ## 简介 -`rocketmq-logger` 插件利用ngx_lua客户端能力,可推送JSON格式的日志到外部rocketmq集群。 +`rocketmq-logger` 插件利用ngx_lua客户端能力,可推送JSON格式的请求日志到外部rocketmq集群。 它可以将接口请求日志以 JSON 的形式推送给外部 rocketmq 集群。如果在短时间内没有收到日志数据,请放心,它会在我们的批处理处理器中的计时器功能到期后自动发送日志。 diff --git a/rockspec/apisix-master-0.rockspec b/rockspec/apisix-master-0.rockspec index 4b62943b371e..96f68f467ad4 100644 --- a/rockspec/apisix-master-0.rockspec +++ b/rockspec/apisix-master-0.rockspec @@ -72,7 +72,7 @@ dependencies = { "api7-snowflake = 2.0-1", "inspect == 3.1.1", "lualdap = 1.2.6-1", - "lua-resty-rocketmq = 0.2.1-0", + "lua-resty-rocketmq = 0.2.1-1", } build = { From ecb655e0db25799b0233c00933b387313d09e682 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Thu, 2 Dec 2021 10:28:30 +0800 Subject: [PATCH 07/23] Update rocketmq-logger.lua --- apisix/plugins/rocketmq-logger.lua | 35 +++++++++++------------------- 1 file changed, 13 insertions(+), 22 deletions(-) diff --git a/apisix/plugins/rocketmq-logger.lua b/apisix/plugins/rocketmq-logger.lua index 662caea2a798..2937700e2385 100644 --- a/apisix/plugins/rocketmq-logger.lua +++ b/apisix/plugins/rocketmq-logger.lua @@ -43,23 +43,21 @@ local schema = { enum = {"default", "origin"}, }, nameserver_list = { - type = "object", - minProperties = 1, - patternProperties = { - [".*"] = { - description = "the port of rocketmq nameserver", - type = "integer", - minimum = 1, - maximum = 65535, - }, - }, + type = "array", + minItems = 1, + items = { + type = "array", + items = { + type = "string" + } + } }, - rocketmq_topic = {type = "string"}, + topic = {type = "string"}, key = {type = "string"}, tag = {type = "string"}, timeout = {type = "integer", minimum = 1, default = 3}, use_tls = {type = "boolean", default = false}, - access_key = {type = "string", default = ""}, + access_key = {type = "string", default = .""}, secret_key = {type = "string", default = ""}, name = {type = "string", default = "rocketmq logger"}, max_retry_count = {type = "integer", minimum = 0, default = 0}, @@ -89,7 +87,7 @@ local schema = { } }, }, - required = {"nameserver_list", "rocketmq_topic"} + required = {"nameserver_list", "topic"} } local metadata_schema = { @@ -155,7 +153,7 @@ end local function send_rocketmq_data(conf, log_message, prod) - local result, err = prod:send(conf.rocketmq_topic, log_message, conf.tag, conf.key) + local result, err = prod:send(conf.topic, log_message, conf.tag, conf.key) if not result then return false, "failed to send data to rocketmq topic: " .. err .. ", nameserver_list: " .. core.json.encode(conf.nameserver_list) @@ -205,13 +203,6 @@ function _M.log(conf, ctx) end -- reuse producer via lrucache to avoid unbalanced partitions of messages in rocketmq - local nameserver_list = core.table.new(core.table.nkeys(conf.nameserver_list), 0) - - for host, port in pairs(conf.nameserver_list) do - local nameserver = host .. ':' .. port - core.table.insert(nameserver_list, nameserver) - end - local producer_config = { timeout = conf.timeout * 1000, use_tls = conf.use_tls, @@ -220,7 +211,7 @@ function _M.log(conf, ctx) } local prod, err = core.lrucache.plugin_ctx(lrucache, ctx, nil, create_producer, - nameserver_list, producer_config) + conf.nameserver_list, producer_config) if err then return nil, "failed to create the rocketmq producer: " .. err end From b8efe877cd9e723bd31ac9bf04c2b7500d030d52 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Thu, 2 Dec 2021 10:30:42 +0800 Subject: [PATCH 08/23] Update rocketmq-logger-log-format.t --- t/plugin/rocketmq-logger-log-format.t | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/t/plugin/rocketmq-logger-log-format.t b/t/plugin/rocketmq-logger-log-format.t index 1c4de982b21b..b3a364b14c9c 100644 --- a/t/plugin/rocketmq-logger-log-format.t +++ b/t/plugin/rocketmq-logger-log-format.t @@ -76,11 +76,8 @@ passed [[{ "plugins": { "rocketmq-logger": { - "nameserver_list" : - { - "127.0.0.1":9876 - }, - "rocketmq_topic" : "test2", + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", "key" : "key1", "tag" : "tag1", "timeout" : 1, From 8c08ee8033fe8928118fa694fc348507d2281521 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Thu, 2 Dec 2021 10:37:51 +0800 Subject: [PATCH 09/23] Update rocketmq-logger.t --- t/plugin/rocketmq-logger.t | 202 ++++++++----------------------------- 1 file changed, 43 insertions(+), 159 deletions(-) diff --git a/t/plugin/rocketmq-logger.t b/t/plugin/rocketmq-logger.t index 4ec554a763eb..3ff8cdb99c08 100644 --- a/t/plugin/rocketmq-logger.t +++ b/t/plugin/rocketmq-logger.t @@ -29,10 +29,10 @@ __DATA__ content_by_lua_block { local plugin = require("apisix.plugins.rocketmq-logger") local ok, err = plugin.check_schema({ - rocketmq_topic = "test", + topic = "test", key = "key1", nameserver_list = { - ["127.0.0.1"] = 3 + "127.0.0.1:3" } }) if not ok then @@ -79,7 +79,7 @@ done local plugin = require("apisix.plugins.rocketmq-logger") local ok, err = plugin.check_schema({ nameserver_list = { - ["127.0.0.1"] = 3000 + "127.0.0.1:3000" }, timeout = "10", rocketmq_topic ="test", @@ -111,10 +111,7 @@ done [[{ "plugins": { "rocketmq-logger": { - "nameserver_list" : - { - "127.0.0.1":9876 - }, + "nameserver_list" : [ "127.0.0.1:9876" ], "rocketmq_topic" : "test2", "key" : "key1", "timeout" : 1, @@ -134,10 +131,7 @@ done "value": { "plugins": { "rocketmq-logger": { - "nameserver_list" : - { - "127.0.0.1":9876 - }, + "nameserver_list" : [ "127.0.0.1:9876" ], "rocketmq_topic" : "test2", "key" : "key1", "timeout" : 1, @@ -193,12 +187,8 @@ hello world [[{ "plugins": { "rocketmq-logger": { - "nameserver_list" : - { - "127.0.0.1":9876, - "127.0.0.1":9877 - }, - "rocketmq_topic" : "test2", + "nameserver_list" : [ "127.0.0.1:9877" ], + "topic" : "test2", "producer_type": "sync", "key" : "key1", "batch_max_size": 1 @@ -217,12 +207,8 @@ hello world "value": { "plugins": { "rocketmq-logger": { - "nameserver_list" : - { - "127.0.0.1":9876, - "127.0.0.1":9877 - }, - "rocketmq_topic" : "test2", + "nameserver_list" : [ "127.0.0.1:9877" ], + "topic" : "test2", "producer_type": "sync", "key" : "key1", "batch_max_size": 1 @@ -270,10 +256,8 @@ failed to send data to rocketmq topic [[{ "plugins": { "rocketmq-logger": { - "nameserver_list" : { - "127.0.0.1":9876 - }, - "rocketmq_topic" : "test2", + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", "key" : "key1", "timeout" : 1, "batch_max_size": 1, @@ -334,10 +318,8 @@ abcdef [[{ "plugins": { "rocketmq-logger": { - "nameserver_list" : { - "127.0.0.1":9876 - }, - "rocketmq_topic" : "test2", + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", "key" : "key1", "timeout" : 1, "batch_max_size": 1, @@ -396,10 +378,8 @@ connection: close [[{ "plugins": { "rocketmq-logger": { - "nameserver_list" : { - "127.0.0.1":9876 - }, - "rocketmq_topic" : "test2", + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", "key" : "key1", "timeout" : 1, "batch_max_size": 1, @@ -454,11 +434,8 @@ qr/send data to rocketmq: \{.*"upstream":"127.0.0.1:1980"/ [[{ "plugins": { "rocketmq-logger": { - "nameserver_list" : - { - "127.0.0.1":9876 - }, - "rocketmq_topic" : "test2", + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", "timeout" : 1, "batch_max_size": 1 } @@ -476,11 +453,8 @@ qr/send data to rocketmq: \{.*"upstream":"127.0.0.1:1980"/ "value": { "plugins": { "rocketmq-logger": { - "nameserver_list" : - { - "127.0.0.1":9876 - }, - "rocketmq_topic" : "test2", + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", "timeout" : 1, "batch_max_size": 1 } @@ -534,10 +508,8 @@ hello world [[{ "plugins": { "rocketmq-logger": { - "nameserver_list" : { - "127.0.0.1":9876 - }, - "rocketmq_topic" : "test2", + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", "timeout" : 1, "batch_max_size": 1, "include_req_body": false @@ -591,10 +563,8 @@ qr/send data to rocketmq: \{.*"upstream":"127.0.0.1:1980"/ [[{ "plugins": { "rocketmq-logger": { - "nameserver_list" : { - "127.0.0.1": 9876 - }, - "rocketmq_topic" : "test3", + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test3", "timeout" : 1, "batch_max_size": 1, "include_req_body": false @@ -634,10 +604,8 @@ passed [[{ "plugins": { "rocketmq-logger": { - "nameserver_list" : { - "127.0.0.1": 9876 - }, - "rocketmq_topic" : "test3", + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test3", "producer_type": "sync", "timeout" : 1, "batch_max_size": 1, @@ -685,10 +653,8 @@ qr/queue: 2/] [[{ "plugins": { "rocketmq-logger": { - "nameserver_list" : { - "127.0.0.1": 9876 - }, - "rocketmq_topic" : "test3", + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test3", "producer_type": "async", "timeout" : 1, "batch_max_size": 1, @@ -754,10 +720,8 @@ qr/queue: 2/] ngx.HTTP_PATCH, [[{ "rocketmq-logger": { - "nameserver_list" : { - "127.0.0.1": 9876 - }, - "rocketmq_topic" : "test2", + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", "timeout" : 1, "batch_max_size": 1, "include_req_body": false @@ -778,10 +742,8 @@ qr/queue: 2/] ngx.HTTP_PATCH, [[{ "rocketmq-logger": { - "nameserver_list" : { - "127.0.0.1": 19876 - }, - "rocketmq_topic" : "test4", + "nameserver_list" : [ "127.0.0.1:19876" ], + "topic" : "test4", "timeout" : 1, "batch_max_size": 1, "include_req_body": false @@ -825,10 +787,8 @@ qr/not found topic/ ngx.HTTP_PATCH, [[{ "rocketmq-logger": { - "nameserver_list" : { - "127.0.0.1": 9876 - }, - "rocketmq_topic" : "undefined_topic", + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "undefined_topic", "timeout" : 1, "batch_max_size": 1, "include_req_body": false @@ -859,73 +819,6 @@ qr/getTopicRouteInfoFromNameserver return TOPIC_NOT_EXIST, No topic route info i -=== TEST 22: check nameserver_list via schema ---- config - location /t { - content_by_lua_block { - local data = { - { - input = { - nameserver_list = {}, - rocketmq_topic = "test", - key= "key1", - }, - }, - { - input = { - nameserver_list = { - ["127.0.0.1"] = "9876" - }, - rocketmq_topic = "test", - key= "key1", - }, - }, - { - input = { - nameserver_list = { - ["127.0.0.1"] = 0 - }, - rocketmq_topic = "test", - key= "key1", - }, - }, - { - input = { - nameserver_list = { - ["127.0.0.1"] = 65536 - }, - rocketmq_topic = "test", - key= "key1", - }, - }, - } - - local plugin = require("apisix.plugins.rocketmq-logger") - - local err_count = 0 - for i in ipairs(data) do - local ok, err = plugin.check_schema(data[i].input) - if not ok then - err_count = err_count + 1 - ngx.say(err) - end - end - - assert(err_count == #data) - } - } ---- request -GET /t ---- response_body -property "nameserver_list" validation failed: expect object to have at least 1 properties -property "nameserver_list" validation failed: failed to validate 127.0.0.1 (matching ".*"): wrong type: expected integer, got string -property "nameserver_list" validation failed: failed to validate 127.0.0.1 (matching ".*"): expected 0 to be greater than 1 -property "nameserver_list" validation failed: failed to validate 127.0.0.1 (matching ".*"): expected 65536 to be smaller than 65535 ---- no_error_log -[error] - - - === TEST 23: rocketmq nameserver list info in log --- config location /t { @@ -936,11 +829,8 @@ property "nameserver_list" validation failed: failed to validate 127.0.0.1 (matc [[{ "plugins": { "rocketmq-logger": { - "nameserver_list" : - { - "127.0.0.127":9876 - }, - "rocketmq_topic" : "test2", + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", "producer_type": "sync", "key" : "key1", "batch_max_size": 1 @@ -1003,11 +893,8 @@ GET /t [=[{ "plugins": { "rocketmq-logger": { - "nameserver_list" : - { - "127.0.0.1":9876 - }, - "rocketmq_topic" : "test2", + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", "key" : "key1", "timeout" : 1, "include_req_body": true, @@ -1078,10 +965,10 @@ qr/send data to rocketmq: \{.*"body":"abcdef"/ content_by_lua_block { local plugin = require("apisix.plugins.rocketmq-logger") local ok, err = plugin.check_schema({ - rocketmq_topic = "test", + topic = "test", key = "key1", nameserver_list = { - ["127.0.0.1"] = 3 + "127.0.0.1:3" }, include_req_body = true, include_req_body_expr = { @@ -1110,10 +997,10 @@ done content_by_lua_block { local plugin = require("apisix.plugins.rocketmq-logger") local ok, err = plugin.check_schema({ - rocketmq_topic = "test", + topic = "test", key = "key1", nameserver_list = { - ["127.0.0.1"] = 3 + "127.0.0.1:3" }, include_resp_body = true, include_resp_body_expr = { @@ -1146,11 +1033,8 @@ done [=[{ "plugins": { "rocketmq-logger": { - "nameserver_list" : - { - "127.0.0.1":9876 - }, - "rocketmq_topic" : "test2", + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", "key" : "key1", "timeout" : 1, "include_resp_body": true, From f5a40683fe4632833812c87675ec02eda1e5e1c0 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Thu, 2 Dec 2021 10:42:23 +0800 Subject: [PATCH 10/23] fix --- t/plugin/rocketmq-logger.t | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/t/plugin/rocketmq-logger.t b/t/plugin/rocketmq-logger.t index 3ff8cdb99c08..5a0f6767fb5f 100644 --- a/t/plugin/rocketmq-logger.t +++ b/t/plugin/rocketmq-logger.t @@ -55,7 +55,7 @@ done location /t { content_by_lua_block { local plugin = require("apisix.plugins.rocketmq-logger") - local ok, err = plugin.check_schema({rocketmq_topic = "test", key= "key1"}) + local ok, err = plugin.check_schema({topic = "test", key= "key1"}) if not ok then ngx.say(err) end @@ -82,7 +82,7 @@ done "127.0.0.1:3000" }, timeout = "10", - rocketmq_topic ="test", + topic ="test", key= "key1" }) if not ok then @@ -112,7 +112,7 @@ done "plugins": { "rocketmq-logger": { "nameserver_list" : [ "127.0.0.1:9876" ], - "rocketmq_topic" : "test2", + "topic" : "test2", "key" : "key1", "timeout" : 1, "batch_max_size": 1 @@ -132,7 +132,7 @@ done "plugins": { "rocketmq-logger": { "nameserver_list" : [ "127.0.0.1:9876" ], - "rocketmq_topic" : "test2", + "topic" : "test2", "key" : "key1", "timeout" : 1, "batch_max_size": 1 From 2dc7f54b02fa067457b4b63e2fada8e0aeb34920 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Thu, 2 Dec 2021 10:45:59 +0800 Subject: [PATCH 11/23] update document --- docs/en/latest/plugins/rocketmq-logger.md | 9 +++------ docs/zh/latest/plugins/rocketmq-logger.md | 9 +++------ 2 files changed, 6 insertions(+), 12 deletions(-) diff --git a/docs/en/latest/plugins/rocketmq-logger.md b/docs/en/latest/plugins/rocketmq-logger.md index 3e4fb96202bc..9337f4f5a4d4 100644 --- a/docs/en/latest/plugins/rocketmq-logger.md +++ b/docs/en/latest/plugins/rocketmq-logger.md @@ -44,7 +44,7 @@ For more info on Batch-Processor in Apache APISIX please refer. | Name | Type | Requirement | Default | Valid | Description | | ---------------- | ------- | ----------- | -------------- | ------- | ---------------------------------------------------------------------------------------- | | nameserver_list | object | required | | | An array of rocketmq nameservers. | -| rocketmq_topic | string | required | | | Target topic to push data. | +| topic | string | required | | | Target topic to push data. | | key | string | optional | | | Keys of messages to send. | | tag | string | optional | | | Tags of messages to send. | | timeout | integer | optional | 3 | [1,...] | Timeout for the upstream to send data. | @@ -148,11 +148,8 @@ curl http://127.0.0.1:9080/apisix/admin/routes/5 -H 'X-API-KEY: edd1c9f034335f13 { "plugins": { "rocketmq-logger": { - "nameserver_list" : - { - "127.0.0.1":9876 - }, - "rocketmq_topic" : "test2", + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", "batch_max_size": 1, "name": "rocketmq logger" } diff --git a/docs/zh/latest/plugins/rocketmq-logger.md b/docs/zh/latest/plugins/rocketmq-logger.md index 322bed833adf..5283847837a8 100644 --- a/docs/zh/latest/plugins/rocketmq-logger.md +++ b/docs/zh/latest/plugins/rocketmq-logger.md @@ -44,7 +44,7 @@ title: rocketmq-logger | 名称 | 类型 | 必选项 | 默认值 | 有效值 | 描述 | | ---------------- | ------- | ------ | -------------- | ------- | ------------------------------------------------ | | nameserver_list | object | 必须 | | | 要推送的 rocketmq 的 nameserver 列表。 | -| rocketmq_topic | string | 必须 | | | 要推送的 topic。 | +| topic | string | 必须 | | | 要推送的 topic。 | | key | string | 可选 | | | 发送消息的keys。 | | tag | string | 可选 | | | 发送消息的tags。 | | timeout | integer | 可选 | 3 | [1,...] | 发送数据的超时时间。 | @@ -148,11 +148,8 @@ curl http://127.0.0.1:9080/apisix/admin/routes/1 -H 'X-API-KEY: edd1c9f034335f13 { "plugins": { "rocketmq-logger": { - "nameserver_list" : - { - "127.0.0.1":9876 - }, - "rocketmq_topic" : "test2", + "nameserver_list" : [ "127.0.0.1:9876" ], + "topic" : "test2", } }, "upstream": { From 2f70923f02b068d7f30310d307a666ae811c4b94 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Thu, 2 Dec 2021 10:55:46 +0800 Subject: [PATCH 12/23] typo --- apisix/plugins/rocketmq-logger.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apisix/plugins/rocketmq-logger.lua b/apisix/plugins/rocketmq-logger.lua index 2937700e2385..3f8f04741034 100644 --- a/apisix/plugins/rocketmq-logger.lua +++ b/apisix/plugins/rocketmq-logger.lua @@ -57,7 +57,7 @@ local schema = { tag = {type = "string"}, timeout = {type = "integer", minimum = 1, default = 3}, use_tls = {type = "boolean", default = false}, - access_key = {type = "string", default = .""}, + access_key = {type = "string", default = ""}, secret_key = {type = "string", default = ""}, name = {type = "string", default = "rocketmq logger"}, max_retry_count = {type = "integer", minimum = 0, default = 0}, From a52414ea55d5529874e69ce5522ad4f9122ab18d Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Thu, 2 Dec 2021 10:58:39 +0800 Subject: [PATCH 13/23] lint --- apisix/plugins/rocketmq-logger.lua | 1 - 1 file changed, 1 deletion(-) diff --git a/apisix/plugins/rocketmq-logger.lua b/apisix/plugins/rocketmq-logger.lua index 3f8f04741034..ecfe5088e3ce 100644 --- a/apisix/plugins/rocketmq-logger.lua +++ b/apisix/plugins/rocketmq-logger.lua @@ -21,7 +21,6 @@ local acl_rpchook = require("resty.rocketmq.acl_rpchook") local batch_processor = require("apisix.utils.batch-processor") local plugin = require("apisix.plugin") -local pairs = pairs local type = type local ipairs = ipairs local plugin_name = "rocketmq-logger" From 256c2b241b425167758d1522da71bec9dc86c476 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Thu, 2 Dec 2021 11:07:22 +0800 Subject: [PATCH 14/23] reindex --- t/plugin/rocketmq-logger.t | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/t/plugin/rocketmq-logger.t b/t/plugin/rocketmq-logger.t index 5a0f6767fb5f..8e0bbc51d9c5 100644 --- a/t/plugin/rocketmq-logger.t +++ b/t/plugin/rocketmq-logger.t @@ -819,7 +819,7 @@ qr/getTopicRouteInfoFromNameserver return TOPIC_NOT_EXIST, No topic route info i -=== TEST 23: rocketmq nameserver list info in log +=== TEST 22: rocketmq nameserver list info in log --- config location /t { content_by_lua_block { @@ -863,7 +863,7 @@ qr/failed to send data to rocketmq topic: .*, nameserver_list: \{"127.0.0.127":9 -=== TEST 24: delete plugin metadata, tests would fail if run rocketmq-logger-log-format.t and plugin metadata is added +=== TEST 23: delete plugin metadata, tests would fail if run rocketmq-logger-log-format.t and plugin metadata is added --- config location /t { content_by_lua_block { @@ -883,7 +883,7 @@ GET /t -=== TEST 25: set route(id: 1,include_req_body = true,include_req_body_expr = array) +=== TEST 24: set route(id: 1,include_req_body = true,include_req_body_expr = array) --- config location /t { content_by_lua_block { @@ -933,7 +933,7 @@ passed -=== TEST 26: hit route, expr eval success +=== TEST 25: hit route, expr eval success --- request POST /hello?name=qwerty abcdef @@ -947,7 +947,7 @@ qr/send data to rocketmq: \{.*"body":"abcdef"/ -=== TEST 27: hit route,expr eval fail +=== TEST 26: hit route,expr eval fail --- request POST /hello?name=zcxv abcdef @@ -959,7 +959,7 @@ qr/send data to rocketmq: \{.*"body":"abcdef"/ -=== TEST 28: check log schema(include_req_body) +=== TEST 27: check log schema(include_req_body) --- config location /t { content_by_lua_block { @@ -991,7 +991,7 @@ done -=== TEST 29: check log schema(include_resp_body) +=== TEST 28: check log schema(include_resp_body) --- config location /t { content_by_lua_block { @@ -1023,7 +1023,7 @@ done -=== TEST 30: set route(id: 1,include_resp_body = true,include_resp_body_expr = array) +=== TEST 29: set route(id: 1,include_resp_body = true,include_resp_body_expr = array) --- config location /t { content_by_lua_block { @@ -1073,7 +1073,7 @@ passed -=== TEST 31: hit route, expr eval success +=== TEST 30: hit route, expr eval success --- request POST /hello?name=qwerty abcdef @@ -1087,7 +1087,7 @@ qr/send data to rocketmq: \{.*"body":"hello world\\n"/ -=== TEST 32: hit route,expr eval fail +=== TEST 31: hit route,expr eval fail --- request POST /hello?name=zcxv abcdef From 52f559684fe50d514a7301b06801e113faa6c2b0 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Thu, 2 Dec 2021 12:27:10 +0800 Subject: [PATCH 15/23] fix --- apisix/plugins/rocketmq-logger.lua | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/apisix/plugins/rocketmq-logger.lua b/apisix/plugins/rocketmq-logger.lua index ecfe5088e3ce..76ba81e3229e 100644 --- a/apisix/plugins/rocketmq-logger.lua +++ b/apisix/plugins/rocketmq-logger.lua @@ -45,10 +45,7 @@ local schema = { type = "array", minItems = 1, items = { - type = "array", - items = { - type = "string" - } + type = "string" } }, topic = {type = "string"}, From cb71aae8ce5eb6ab163be38e5fbf4b3fe98d1b05 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Thu, 2 Dec 2021 14:35:04 +0800 Subject: [PATCH 16/23] update --- apisix/plugins/rocketmq-logger.lua | 2 +- ci/linux-ci-init-service.sh | 8 ++++---- ci/pod/docker-compose.yml | 8 ++++---- docs/en/latest/plugins/rocketmq-logger.md | 2 +- docs/zh/latest/plugins/rocketmq-logger.md | 12 ++++++------ 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/apisix/plugins/rocketmq-logger.lua b/apisix/plugins/rocketmq-logger.lua index 76ba81e3229e..0595f2098c6c 100644 --- a/apisix/plugins/rocketmq-logger.lua +++ b/apisix/plugins/rocketmq-logger.lua @@ -25,8 +25,8 @@ local type = type local ipairs = ipairs local plugin_name = "rocketmq-logger" local stale_timer_running = false -local timer_at = ngx.timer.at local ngx = ngx +local timer_at = ngx.timer.at local buffers = {} local lrucache = core.lrucache.new({ diff --git a/ci/linux-ci-init-service.sh b/ci/linux-ci-init-service.sh index 997d2e143886..0c3ff5d03096 100755 --- a/ci/linux-ci-init-service.sh +++ b/ci/linux-ci-init-service.sh @@ -27,7 +27,7 @@ docker exec -i openwhisk waitready docker exec -i openwhisk bash -c "wsk action update test <(echo 'function main(args){return {\"hello\":args.name || \"test\"}}') --kind nodejs:14" docker exec -i rmqnamesrv rm /home/rocketmq/rocketmq-4.6.0/conf/tools.yml -docker exec -i rmqnamesrv /home/rocketmq/rocketmq-4.6.0/bin/mqadmin updateTopic -n namesrv:9876 -t test -c DefaultCluster -docker exec -i rmqnamesrv /home/rocketmq/rocketmq-4.6.0/bin/mqadmin updateTopic -n namesrv:9876 -t test2 -c DefaultCluster -docker exec -i rmqnamesrv /home/rocketmq/rocketmq-4.6.0/bin/mqadmin updateTopic -n namesrv:9876 -t test3 -c DefaultCluster -docker exec -i rmqnamesrv /home/rocketmq/rocketmq-4.6.0/bin/mqadmin updateTopic -n namesrv:9876 -t test4 -c DefaultCluster +docker exec -i rmqnamesrv /home/rocketmq/rocketmq-4.6.0/bin/mqadmin updateTopic -n rocketmq_namesrv:9876 -t test -c DefaultCluster +docker exec -i rmqnamesrv /home/rocketmq/rocketmq-4.6.0/bin/mqadmin updateTopic -n rocketmq_namesrv:9876 -t test2 -c DefaultCluster +docker exec -i rmqnamesrv /home/rocketmq/rocketmq-4.6.0/bin/mqadmin updateTopic -n rocketmq_namesrv:9876 -t test3 -c DefaultCluster +docker exec -i rmqnamesrv /home/rocketmq/rocketmq-4.6.0/bin/mqadmin updateTopic -n rocketmq_namesrv:9876 -t test4 -c DefaultCluster diff --git a/ci/pod/docker-compose.yml b/ci/pod/docker-compose.yml index 0fe3f0a3eb46..2dedaf9dff80 100644 --- a/ci/pod/docker-compose.yml +++ b/ci/pod/docker-compose.yml @@ -355,7 +355,7 @@ services: networks: apisix_net: - namesrv: + rocketmq_namesrv: image: apacherocketmq/rocketmq:4.6.0 container_name: rmqnamesrv restart: unless-stopped @@ -365,7 +365,7 @@ services: networks: rocketmq_net: - broker: + rocketmq_broker: image: apacherocketmq/rocketmq:4.6.0 container_name: rmqbroker restart: unless-stopped @@ -374,8 +374,8 @@ services: - "10911:10911" - "10912:10912" depends_on: - - namesrv - command: sh mqbroker -n namesrv:9876 -c ../conf/broker.conf + - rocketmq_namesrv + command: sh mqbroker -n rocketmq_namesrv:9876 -c ../conf/broker.conf networks: rocketmq_net: diff --git a/docs/en/latest/plugins/rocketmq-logger.md b/docs/en/latest/plugins/rocketmq-logger.md index 9337f4f5a4d4..48557a55898f 100644 --- a/docs/en/latest/plugins/rocketmq-logger.md +++ b/docs/en/latest/plugins/rocketmq-logger.md @@ -34,7 +34,7 @@ title: rocketmq-logger `rocketmq-logger` is a plugin which provides the ability to push requests log data as JSON objects to your external rocketmq clusters. -This plugin provides the ability to push requests log data as JSON objects to your external rocketmq clusters. In case if you did not receive the log data don't worry give it some time it will automatically send the logs after the timer function expires in our Batch Processor. + In case if you did not receive the log data don't worry give it some time it will automatically send the logs after the timer function expires in our Batch Processor. For more info on Batch-Processor in Apache APISIX please refer. [Batch-Processor](../batch-processor.md) diff --git a/docs/zh/latest/plugins/rocketmq-logger.md b/docs/zh/latest/plugins/rocketmq-logger.md index 5283847837a8..51ab22d0d391 100644 --- a/docs/zh/latest/plugins/rocketmq-logger.md +++ b/docs/zh/latest/plugins/rocketmq-logger.md @@ -32,9 +32,9 @@ title: rocketmq-logger ## 简介 -`rocketmq-logger` 插件利用ngx_lua客户端能力,可推送JSON格式的请求日志到外部rocketmq集群。 +`rocketmq-logger` 插件可以将接口请求日志以 JSON 的形式推送给外部 rocketmq 集群。 -它可以将接口请求日志以 JSON 的形式推送给外部 rocketmq 集群。如果在短时间内没有收到日志数据,请放心,它会在我们的批处理处理器中的计时器功能到期后自动发送日志。 +如果在短时间内没有收到日志数据,请放心,它会在我们的批处理处理器中的计时器功能到期后自动发送日志。 有关 Apache APISIX 中 Batch-Processor 的更多信息,请参考。 [Batch-Processor](../batch-processor.md) @@ -67,7 +67,7 @@ title: rocketmq-logger - **default**: - ```json + ```json { "upstream": "127.0.0.1:1980", "start_time": 1619414294760, @@ -106,18 +106,18 @@ title: rocketmq-logger }, "latency": 0 } - ``` +``` - **origin**: - ```http +```http GET /hello?ab=cd HTTP/1.1 host: localhost content-length: 6 connection: close abcdef - ``` +``` ## 工作原理 From cfbc7c7984eca648405b049eaa625ea87e59dc8a Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Thu, 2 Dec 2021 14:41:22 +0800 Subject: [PATCH 17/23] update again --- docs/zh/latest/plugins/rocketmq-logger.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zh/latest/plugins/rocketmq-logger.md b/docs/zh/latest/plugins/rocketmq-logger.md index 51ab22d0d391..6d4f2909780b 100644 --- a/docs/zh/latest/plugins/rocketmq-logger.md +++ b/docs/zh/latest/plugins/rocketmq-logger.md @@ -67,7 +67,7 @@ title: rocketmq-logger - **default**: - ```json +```json { "upstream": "127.0.0.1:1980", "start_time": 1619414294760, From bbf98171b1be90b7cc51dafefc14681b3b4f59e4 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Thu, 2 Dec 2021 14:43:48 +0800 Subject: [PATCH 18/23] update --- docs/en/latest/plugins/rocketmq-logger.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/en/latest/plugins/rocketmq-logger.md b/docs/en/latest/plugins/rocketmq-logger.md index 48557a55898f..a6c93842a525 100644 --- a/docs/en/latest/plugins/rocketmq-logger.md +++ b/docs/en/latest/plugins/rocketmq-logger.md @@ -67,7 +67,7 @@ For more info on Batch-Processor in Apache APISIX please refer. - **default**: - ```json +```json { "upstream": "127.0.0.1:1980", "start_time": 1619414294760, @@ -106,18 +106,18 @@ For more info on Batch-Processor in Apache APISIX please refer. }, "latency": 0 } - ``` +``` - **origin**: - ```http +```http GET /hello?ab=cd HTTP/1.1 host: localhost content-length: 6 connection: close abcdef - ``` +``` ## Info From 70977f94a75f7631ee93f5a1217600d1f4947bfa Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Thu, 2 Dec 2021 20:35:02 +0800 Subject: [PATCH 19/23] remove debug code --- apisix/plugins/rocketmq-logger.lua | 2 -- 1 file changed, 2 deletions(-) diff --git a/apisix/plugins/rocketmq-logger.lua b/apisix/plugins/rocketmq-logger.lua index 0595f2098c6c..d6efae3a031e 100644 --- a/apisix/plugins/rocketmq-logger.lua +++ b/apisix/plugins/rocketmq-logger.lua @@ -170,8 +170,6 @@ function _M.log(conf, ctx) local entry if conf.meta_format == "origin" then entry = log_util.get_req_original(ctx, conf) - -- core.log.info("origin entry: ", entry) - else local metadata = plugin.plugin_metadata(plugin_name) core.log.info("metadata: ", core.json.delay_encode(metadata)) From 692ed04de1e515d8fb608fdb2e36b54328b9813f Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Thu, 2 Dec 2021 20:45:19 +0800 Subject: [PATCH 20/23] doc --- docs/en/latest/plugins/rocketmq-logger.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/latest/plugins/rocketmq-logger.md b/docs/en/latest/plugins/rocketmq-logger.md index a6c93842a525..6df5d4d07536 100644 --- a/docs/en/latest/plugins/rocketmq-logger.md +++ b/docs/en/latest/plugins/rocketmq-logger.md @@ -50,7 +50,7 @@ For more info on Batch-Processor in Apache APISIX please refer. | timeout | integer | optional | 3 | [1,...] | Timeout for the upstream to send data. | | use_tls | boolean | optional | false | | Whether to open TLS | | access_key | string | optional | "" | | access key for ACL, empty string means disable ACL. | -| secret_key | string | optional | "" | | secret key for ACL。 | +| secret_key | string | optional | "" | | secret key for ACL. | | name | string | optional | "rocketmq logger" | | A unique identifier to identity the batch processor. | | meta_format | enum | optional | "default" | ["default","origin"] | `default`: collect the request information with default JSON way. `origin`: collect the request information with original HTTP request. [example](#examples-of-meta_format)| | batch_max_size | integer | optional | 1000 | [1,...] | Set the maximum number of logs sent in each batch. When the number of logs reaches the set maximum, all logs will be automatically pushed to the `rocketmq` service. | From 4f34a945ab88a39967babe6460ddeea7b65d7075 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Fri, 3 Dec 2021 09:34:02 +0800 Subject: [PATCH 21/23] update doc --- docs/en/latest/plugins/rocketmq-logger.md | 8 ++++---- docs/zh/latest/plugins/rocketmq-logger.md | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/en/latest/plugins/rocketmq-logger.md b/docs/en/latest/plugins/rocketmq-logger.md index 6df5d4d07536..f17968c4dd51 100644 --- a/docs/en/latest/plugins/rocketmq-logger.md +++ b/docs/en/latest/plugins/rocketmq-logger.md @@ -133,10 +133,10 @@ In case of errors, returns `nil` with a string describing the error (`buffer ove Specify the nameservers of the external rocketmq servers as below sample. ```json -{ - "127.0.0.1":9876, - "127.0.0.2":9876 -} +[ + "127.0.0.1:9876", + "127.0.0.2:9876" +] ``` ## How To Enable diff --git a/docs/zh/latest/plugins/rocketmq-logger.md b/docs/zh/latest/plugins/rocketmq-logger.md index 6d4f2909780b..f61c0b4acf9a 100644 --- a/docs/zh/latest/plugins/rocketmq-logger.md +++ b/docs/zh/latest/plugins/rocketmq-logger.md @@ -133,10 +133,10 @@ title: rocketmq-logger 配置多个nameserver地址如下: ```json -{ - "127.0.0.1":9876, - "127.0.0.2":9876 -} +[ + "127.0.0.1:9876", + "127.0.0.2:9876" +] ``` ## 如何启用 From dde100b11057be3b655c5aa3c06d9660b3d40cbe Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Mon, 6 Dec 2021 09:43:21 +0800 Subject: [PATCH 22/23] fix mem leak in remove_stale_objects --- apisix/plugins/rocketmq-logger.lua | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apisix/plugins/rocketmq-logger.lua b/apisix/plugins/rocketmq-logger.lua index d6efae3a031e..bf7adc5724e1 100644 --- a/apisix/plugins/rocketmq-logger.lua +++ b/apisix/plugins/rocketmq-logger.lua @@ -22,7 +22,7 @@ local batch_processor = require("apisix.utils.batch-processor") local plugin = require("apisix.plugin") local type = type -local ipairs = ipairs +local pairs = pairs local plugin_name = "rocketmq-logger" local stale_timer_running = false local ngx = ngx @@ -121,7 +121,7 @@ local function remove_stale_objects(premature) return end - for key, batch in ipairs(buffers) do + for key, batch in pairs(buffers) do if #batch.entry_buffer.entries == 0 and #batch.batch_to_process == 0 then core.log.warn("removing batch processor stale object, conf: ", core.json.delay_encode(key)) From 0db7d8c15dd6594ab2e742b11b98ac5479d7bda5 Mon Sep 17 00:00:00 2001 From: yuz10 <845238369@qq.com> Date: Mon, 6 Dec 2021 10:02:50 +0800 Subject: [PATCH 23/23] Update rocketmq-logger.t --- t/plugin/rocketmq-logger.t | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/t/plugin/rocketmq-logger.t b/t/plugin/rocketmq-logger.t index 8e0bbc51d9c5..0ac81229a0cb 100644 --- a/t/plugin/rocketmq-logger.t +++ b/t/plugin/rocketmq-logger.t @@ -177,7 +177,7 @@ hello world -=== TEST 6: error log +=== TEST 6: unavailable nameserver --- config location /t { content_by_lua_block {