Skip to content

Commit

Permalink
chore(kafka-logger): delay log partition_id (#3481)
Browse files Browse the repository at this point in the history
  • Loading branch information
spacewander authored Feb 2, 2021
1 parent b033a75 commit 436beab
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 3 deletions.
48 changes: 48 additions & 0 deletions apisix/core/log.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,12 @@
local ngx = ngx
local ngx_log = ngx.log
local require = require
local select = select
local setmetatable = setmetatable
local tostring = tostring
local unpack = unpack
-- avoid loading other module since core.log is the most foundational one
local tab_clear = require("table.clear")


local _M = {version = 0.4}
Expand Down Expand Up @@ -87,4 +92,47 @@ setmetatable(_M, {__index = function(self, cmd)
end})


local delay_tab = setmetatable({
func = function() end,
args = {},
res = nil,
}, {
__tostring = function(self)
-- the `__tostring` will be called twice, the first to get the length and
-- the second to get the data
if self.res then
local res = self.res
-- avoid unexpected reference
self.res = nil
return res
end

local res, err = self.func(unpack(self.args))
if err then
ngx.log(ngx.WARN, "failed to exec: ", err)
end

-- avoid unexpected reference
tab_clear(self.args)
self.res = tostring(res)
return self.res
end
})


-- It works well with log.$level, eg: log.info(..., log.delay_exec(func, ...))
-- Should not use it elsewhere.
function _M.delay_exec(func, ...)
delay_tab.func = func

tab_clear(delay_tab.args)
for i = 1, select('#', ...) do
delay_tab.args[i] = select(i, ...)
end

delay_tab.res = nil
return delay_tab
end


return _M
7 changes: 4 additions & 3 deletions apisix/plugins/kafka-logger.lua
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ function _M.check_schema(conf)
end


local function partition_id(sendbuffer, topic, log_message)
local function get_partition_id(sendbuffer, topic, log_message)
if not sendbuffer.topics[topic] then
core.log.info("current topic in sendbuffer has no message")
return nil
Expand Down Expand Up @@ -113,8 +113,9 @@ local function send_kafka_data(conf, log_message, prod)
end

local ok, err = prod:send(conf.kafka_topic, conf.key, log_message)
core.log.info("partition_id: ", partition_id(prod.sendbuffer,
conf.kafka_topic, log_message))
core.log.info("partition_id: ",
core.log.delay_exec(get_partition_id,
prod.sendbuffer, conf.kafka_topic, log_message))

if not ok then
return nil, "failed to send data to Kafka topic: " .. err
Expand Down

0 comments on commit 436beab

Please sign in to comment.