Skip to content

Commit

Permalink
feature: add Kafka Logger plugin. (apache#1312)
Browse files Browse the repository at this point in the history
  • Loading branch information
Ayeshmantha Perera authored and SaberMaster committed Jun 30, 2020
1 parent d789f4e commit 79b2339
Show file tree
Hide file tree
Showing 12 changed files with 643 additions and 4 deletions.
8 changes: 8 additions & 0 deletions .travis/linux_openresty_runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ before_install() {
sudo cpanm --notest Test::Nginx >build.log 2>&1 || (cat build.log && exit 1)
docker pull redis:3.0-alpine
docker run --rm -itd -p 6379:6379 --name apisix_redis redis:3.0-alpine
# spin up kafka cluster for tests (1 zookeper and 1 kafka instance)
docker pull bitnami/zookeeper:3.6.0
docker pull bitnami/kafka:latest
docker network create kafka-net --driver bridge
docker run --name zookeeper-server -d -p 2181:2181 --network kafka-net -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.6.0
docker run --name kafka-server1 -d --network kafka-net -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -p 9092:9092 -e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true bitnami/kafka:latest
sleep 5
docker exec -it kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 1 --topic test2
}

do_install() {
Expand Down
8 changes: 8 additions & 0 deletions .travis/linux_tengine_runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ before_install() {
sudo cpanm --notest Test::Nginx >build.log 2>&1 || (cat build.log && exit 1)
docker pull redis:3.0-alpine
docker run --rm -itd -p 6379:6379 --name apisix_redis redis:3.0-alpine
# spin up kafka cluster for tests (1 zookeper and 1 kafka instance)
docker pull bitnami/zookeeper:3.6.0
docker pull bitnami/kafka:latest
docker network create kafka-net --driver bridge
docker run --name zookeeper-server -d -p 2181:2181 --network kafka-net -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.6.0
docker run --name kafka-server1 -d --network kafka-net -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -p 9092:9092 -e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true bitnami/kafka:latest
sleep 5
docker exec -it kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 1 --topic test2
}

tengine_install() {
Expand Down
1 change: 1 addition & 0 deletions conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ plugins: # plugin list
- proxy-cache
- tcp-logger
- proxy-mirror
- kafka-logger

stream_plugins:
- mqtt-proxy
5 changes: 3 additions & 2 deletions doc/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,10 @@ Plugins
* [response-rewrite](plugins/response-rewrite.md): Set customized response status code, body and header to the client.
* [fault-injection](plugins/fault-injection.md): The specified response body, response code, and response time can be returned, which provides processing capabilities in different failure scenarios, such as service failure, service overload, and high service delay.
* [proxy-cache](plugins/proxy-cache.md): Provides the ability to cache upstream response data.
* [tcp-logger](plugins/tcp-logger.md): Log requests to TCP servers
* [udp-logger](plugins/udp-logger.md): Log requests to UDP servers
* [tcp-logger](plugins/tcp-logger.md): Log requests to TCP servers.
* [udp-logger](plugins/udp-logger.md): Log requests to UDP servers.
* [proxy-mirror](plugins/proxy-mirror.md): Provides the ability to mirror client requests.
* [kafka-logger](plugins/kafka-logger.md): Log requests to External Kafka servers.

Deploy to the Cloud
=======
Expand Down
2 changes: 1 addition & 1 deletion doc/README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,4 @@ Reference document
* [proxy-mirror](plugins/proxy-mirror-cn.md):代理镜像插件提供镜像客户端请求的能力。
* [udp-logger](plugins/udp-logger.md): 将请求记录到UDP服务器
* [tcp-logger](plugins/tcp-logger.md): 将请求记录到TCP服务器

* [kafka-logger](plugins/kafka-logger-cn.md): 将请求记录到外部Kafka服务器。
130 changes: 130 additions & 0 deletions doc/plugins/kafka-logger-cn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,130 @@
<!--
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
-->

# Summary
- [**定义**](#name)
- [**属性列表**](#attributes)
- [**信息**](#info)
- [**如何开启**](#how-to-enable)
- [**测试插件**](#test-plugin)
- [**禁用插件**](#disable-plugin)

## 定义

`kafka-logger` 是一个插件,可用作ngx_lua nginx模块的Kafka客户端驱动程序。

这将提供将Log数据请求作为JSON对象发送到外部Kafka集群的功能。

## 属性列表

|属性名称 |必选项 |描述|
|--------- |--------|-----------|
| broker_list |必要的| 一系列的Kafka经纪人。|
| kafka_topic |必要的| 定位主题以推送数据。|
| timeout |可选的|上游发送数据超时。|
| async |可选的|布尔值,用于控制是否执行异步推送。|
| key |必要的|消息的密钥。|
| max_retry |可选的|没有重试次数。|

## 信息

异步与同步数据推送之间的区别。

1. 同步模型

如果成功,则返回当前代理和分区的偏移量(** cdata:LL **)。
如果发生错误,则返回“ nil”,并带有描述错误的字符串。

2. 在异步模型中

消息将首先写入缓冲区。
当缓冲区超过`batch_num`时,它将发送到kafka服务器,
或每个`flush_time`刷新缓冲区。

如果成功,则返回“ true”。
如果出现错误,则返回“ nil”,并带有描述错误的字符串(“缓冲区溢出”)。

##### 样本经纪人名单

此插件支持一次推送到多个经纪人。如以下示例所示,指定外部kafka服务器的代理,以使此功能生效。

```json
{
"127.0.0.1":9092,
"127.0.0.1":9093
}
```

## 如何开启

1. 这是有关如何为特定路由启用kafka-logger插件的示例。

```shell
curl http://127.0.0.1:9080/apisix/admin/consumers -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"username": "foo",
"plugins": {
"kafka-logger": {
"broker_list" :
{
"127.0.0.1":9092
},
"kafka_topic" : "test2",
"key" : "key1"
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
}'
```

## 测试插件

* 成功:

```shell
$ curl -i http://127.0.0.1:9080/hello
HTTP/1.1 200 OK
...
hello, world
```

## 禁用插件

当您要禁用`kafka-logger`插件时,这很简单,您可以在插件配置中删除相应的json配置,无需重新启动服务,它将立即生效:

```shell
$ curl http://127.0.0.1:2379/apisix/admin/routes/1 -X PUT -d value='
{
"methods": ["GET"],
"uri": "/hello",
"plugins": {},
"upstream": {
"type": "roundrobin",
"nodes": {
"127.0.0.1:1980": 1
}
}
}'
```
134 changes: 134 additions & 0 deletions doc/plugins/kafka-logger.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
<!--
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
-->

# Summary
- [**Name**](#name)
- [**Attributes**](#attributes)
- [**Info**](#info)
- [**How To Enable**](#how-to-enable)
- [**Test Plugin**](#test-plugin)
- [**Disable Plugin**](#disable-plugin)


## Name

`kafka-logger` is a plugin which works as a Kafka client driver for the ngx_lua nginx module.

This will provide the ability to send Log data requests as JSON objects to external Kafka clusters.

## Attributes

|Name |Requirement |Description|
|--------- |--------|-----------|
| broker_list |required| An array of Kafka brokers.|
| kafka_topic |required| Target topic to push data.|
| timeout |optional|Timeout for the upstream to send data.|
| async |optional|Boolean value to control whether to perform async push.|
| key |required|Key for the message.|
| max_retry |optional|No of retries|

## Info

Difference between async and the sync data push.

1. In sync model

In case of success, returns the offset (** cdata: LL **) of the current broker and partition.
In case of errors, returns `nil` with a string describing the error.

2. In async model

The `message` will write to the buffer first.
It will send to the kafka server when the buffer exceed the `batch_num`,
or every `flush_time` flush the buffer.

In case of success, returns `true`.
In case of errors, returns `nil` with a string describing the error (`buffer overflow`).

##### Sample broker list

This plugin supports to push in to more than one broker at a time. Specify the brokers of the external kafka servers as below
sample to take effect of this functionality.

```json
{
"127.0.0.1":9092,
"127.0.0.1":9093
}
```

## How To Enable

1. Here is an examle on how to enable kafka-logger plugin for a specific route.

```shell
curl http://127.0.0.1:9080/apisix/admin/consumers -H 'X-API-KEY: edd1c9f034335f136f87ad84b625c8f1' -X PUT -d '
{
"username": "foo",
"plugins": {
"kafka-logger": {
"broker_list" :
{
"127.0.0.1":9092
},
"kafka_topic" : "test2",
"key" : "key1"
}
},
"upstream": {
"nodes": {
"127.0.0.1:1980": 1
},
"type": "roundrobin"
},
"uri": "/hello"
}'
```

## Test Plugin

* success:

```shell
$ curl -i http://127.0.0.1:9080/hello
HTTP/1.1 200 OK
...
hello, world
```

## Disable Plugin

When you want to disable the `kafka-logger` plugin, it is very simple,
you can delete the corresponding json configuration in the plugin configuration,
no need to restart the service, it will take effect immediately:

```shell
$ curl http://127.0.0.1:2379/apisix/admin/routes/1 -X PUT -d value='
{
"methods": ["GET"],
"uri": "/hello",
"plugins": {},
"upstream": {
"type": "roundrobin",
"nodes": {
"127.0.0.1:1980": 1
}
}
}'
```
Loading

0 comments on commit 79b2339

Please sign in to comment.