Skip to content

Commit

Permalink
Kafka Logger.
Browse files Browse the repository at this point in the history
  • Loading branch information
Akayeshmantha committed Mar 23, 2020
1 parent 8d745c7 commit 0b5ee35
Show file tree
Hide file tree
Showing 9 changed files with 386 additions and 1 deletion.
8 changes: 8 additions & 0 deletions .travis/linux_openresty_runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ before_install() {
sudo cpanm --notest Test::Nginx >build.log 2>&1 || (cat build.log && exit 1)
docker pull redis:3.0-alpine
docker run --rm -itd -p 6379:6379 --name apisix_redis redis:3.0-alpine
# spin up kafka cluster for tests (1 zookeper and 1 kafka instance)
docker pull bitnami/zookeeper:3.6.0
docker pull bitnami/kafka:latest
docker network create kafka-net --driver bridge
docker run --name zookeeper-server -d -p 2181:2181 --network kafka-net -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.6.0
docker run --name kafka-server1 -d --network kafka-net -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -p 9092:9092 -e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true bitnami/kafka:latest
sleep 5
docker exec -it kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 1 --topic test2
}

do_install() {
Expand Down
8 changes: 8 additions & 0 deletions .travis/linux_tengine_runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,14 @@ before_install() {
sudo cpanm --notest Test::Nginx >build.log 2>&1 || (cat build.log && exit 1)
docker pull redis:3.0-alpine
docker run --rm -itd -p 6379:6379 --name apisix_redis redis:3.0-alpine
# spin up kafka cluster for tests (1 zookeper and 1 kafka instance)
docker pull bitnami/zookeeper:3.6.0
docker pull bitnami/kafka:latest
docker network create kafka-net --driver bridge
docker run --name zookeeper-server -d -p 2181:2181 --network kafka-net -e ALLOW_ANONYMOUS_LOGIN=yes bitnami/zookeeper:3.6.0
docker run --name kafka-server1 -d --network kafka-net -e ALLOW_PLAINTEXT_LISTENER=yes -e KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper-server:2181 -e KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 -p 9092:9092 -e KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=true bitnami/kafka:latest
sleep 5
docker exec -it kafka-server1 /opt/bitnami/kafka/bin/kafka-topics.sh --create --zookeeper zookeeper-server:2181 --replication-factor 1 --partitions 1 --topic test2
}

tengine_install() {
Expand Down
20 changes: 20 additions & 0 deletions .travis/osx_openresty_runner.sh
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,26 @@ before_install() {
sudo cpanm --notest Test::Nginx >build.log 2>&1 || (cat build.log && exit 1)
export_or_prefix
luarocks install --lua-dir=${OPENRESTY_PREFIX}/luajit luacov-coveralls --local --tree=deps

# spin up kafka cluster for tests (1 zookeper and 1 kafka instance)
export ZK_VER=3.5.7
export SCALA_VER=2.11
export KAFKA_VER=2.4.0

if [ ! -f download-cache/kafka_$SCALA_VER-$KAFKA_VER.tgz ]; then wget -P download-cache http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/$KAFKA_VER/kafka_$SCALA_VER-$KAFKA_VER.tgz;fi
if [ ! -f download-cache/apache-zookeeper-$ZK_VER-bin.tar.gz ]; then wget -P download-cache https://mirrors.tuna.tsinghua.edu.cn/apache/zookeeper/zookeeper-$ZK_VER/apache-zookeeper-$ZK_VER-bin.tar.gz;fi

sudo tar -xzf download-cache/apache-zookeeper-$ZK_VER-bin.tar.gz -C /usr/local/
sudo tar -xzf download-cache/kafka_$SCALA_VER-$KAFKA_VER.tgz -C /usr/local/
sudo mv /usr/local/kafka_$SCALA_VER-$KAFKA_VER /usr/local/kafka
sudo mv /usr/local/apache-zookeeper-$ZK_VER-bin /usr/local/zookeeper
sudo cp /usr/local/zookeeper/conf/zoo_sample.cfg /usr/local/zookeeper/conf/zoo.cfg
sudo sed -i '' '$a\
host\.name=127.0.0.1' /usr/local/kafka/config/server.properties
sudo /usr/local/zookeeper/bin/zkServer.sh start
sudo /usr/local/kafka/bin/kafka-server-start.sh -daemon /usr/local/kafka/config/server.properties
sleep 1
/usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 2 --topic test2
}

do_install() {
Expand Down
1 change: 1 addition & 0 deletions conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ plugins: # plugin list
- proxy-cache
- tcp-logger
- proxy-mirror
- kafka-logger

stream_plugins:
- mqtt-proxy
106 changes: 106 additions & 0 deletions lua/apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local core = require("apisix.core")
local log_util = require("apisix.utils.log-util")
local producer = require ("resty.kafka.producer")
local pairs = pairs
local type = type
local table = table

local plugin_name = "kafka-logger"
local ngx = ngx

local timer_at = ngx.timer.at

local schema = {
type = "object",
properties = {
broker_list = {
type = "object"
},
timeout = { -- timeout in milliseconds
type = "integer", minimum = 1, default= 2000
},
kafka_topic = {type = "string"},
async = {type = "boolean", default = false},
key = {type = "string"},
max_retry = {type = "integer", minimum = 0 , default = 3},
},
required = {"broker_list", "kafka_topic", "key"}
}

local _M = {
version = 0.1,
priority = 403,
name = plugin_name,
schema = schema,
}

function _M.check_schema(conf)
return core.schema.check(schema, conf)
end

local function log(premature, conf, log_message)
if premature then
return
end

if core.table.nkeys(conf.broker_list) == 0 then
core.log.error("failed to identify the broker specified")
end

local broker_list = {}
local broker_config = {}

for host, port in pairs(conf.broker_list) do
if type(host) == 'string'
and type(port) == 'number' then

local broker = {
host = host, port = port
}
table.insert(broker_list,broker)
end
end

broker_config["request_timeout"] = conf.timeout
broker_config["max_retry"] = conf.max_retry

--Async producers will queue logs and push them when the buffer exceeds.
if conf.async then
broker_config["producer_type"] = "async"
end

local prod, err = producer:new(broker_list,broker_config)
if err then
core.log.error("failed to identify the broker specified", err)

return
end

local ok, err = prod:send(conf.kafka_topic, conf.key, log_message)
if not ok then
core.log.error("failed to send data to Kafka topic", err)
end

end

function _M.log(conf)
return timer_at(0, log, conf, core.json.encode(log_util.get_full_log(ngx)))
end

return _M
1 change: 1 addition & 0 deletions rockspec/apisix-master-0.rockspec
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ dependencies = {
"lua-resty-prometheus = 1.0",
"jsonschema = 0.8",
"lua-resty-ipmatcher = 0.6",
"lua-resty-kafka = 0.07",
}

build = {
Expand Down
2 changes: 1 addition & 1 deletion t/admin/plugins.t
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ __DATA__
--- request
GET /apisix/admin/plugins/list
--- response_body_like eval
qr/\["limit-req","limit-count","limit-conn","key-auth","basic-auth","prometheus","node-status","jwt-auth","zipkin","ip-restriction","grpc-transcode","serverless-pre-function","serverless-post-function","openid-connect","proxy-rewrite","redirect","response-rewrite","fault-injection","udp-logger","wolf-rbac","proxy-cache","tcp-logger","proxy-mirror"\]/
qr/\["limit-req","limit-count","limit-conn","key-auth","basic-auth","prometheus","node-status","jwt-auth","zipkin","ip-restriction","grpc-transcode","serverless-pre-function","serverless-post-function","openid-connect","proxy-rewrite","redirect","response-rewrite","fault-injection","udp-logger","wolf-rbac","proxy-cache","tcp-logger","proxy-mirror","kafka-logger"\]/
--- no_error_log
[error]
Expand Down
1 change: 1 addition & 0 deletions t/debug/debug-mode.t
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ loaded plugin and sort by priority: 899 name: response-rewrite
loaded plugin and sort by priority: 506 name: grpc-transcode
loaded plugin and sort by priority: 500 name: prometheus
loaded plugin and sort by priority: 405 name: tcp-logger
loaded plugin and sort by priority: 403 name: kafka-logger
loaded plugin and sort by priority: 400 name: udp-logger
loaded plugin and sort by priority: 0 name: example-plugin
loaded plugin and sort by priority: -1000 name: zipkin
Expand Down
Loading

0 comments on commit 0b5ee35

Please sign in to comment.