Skip to content

Commit

Permalink
feature: supported zipkin plugin.
Browse files Browse the repository at this point in the history
  • Loading branch information
moonming committed Jul 24, 2019
1 parent 3eaa17c commit bb2acce
Show file tree
Hide file tree
Showing 8 changed files with 679 additions and 1 deletion.
3 changes: 2 additions & 1 deletion conf/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ plugins: # plugin list
- example-plugin
- limit-req
- limit-count
- limit-conn
- key-auth
- prometheus
- limit-conn
- node-status
- zipkin
20 changes: 20 additions & 0 deletions lua/apisix/core/request.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ local _M = {version = 0.1}


local function _headers(ctx)
if not ctx then
ctx = ngx.ctx.api_ctx
end
local headers = ctx.headers
if not headers then
headers = get_headers()
Expand All @@ -20,6 +23,9 @@ _M.headers = _headers


function _M.header(ctx, name)
if not ctx then
ctx = ngx.ctx.api_ctx
end
return _headers(ctx)[name]
end

Expand All @@ -28,15 +34,29 @@ end
-- so if there is a load balancer between downstream client and APISIX,
-- this function will return the ip of load balancer.
function _M.get_ip(ctx)
if not ctx then
ctx = ngx.ctx.api_ctx
end
return ctx.var.realip_remote_addr or ctx.var.remote_addr or ''
end


-- get remote address of downstream client,
-- in cases there is a load balancer between downstream client and APISIX.
function _M.get_remote_client_ip(ctx)
if not ctx then
ctx = ngx.ctx.api_ctx
end
return ctx.var.remote_addr or ''
end


function _M.get_remote_client_port(ctx)
if not ctx then
ctx = ngx.ctx.api_ctx
end
return tonumber(ctx.var.remote_port)
end


return _M
128 changes: 128 additions & 0 deletions lua/apisix/plugins/zipkin.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
local core = require("apisix.core")
local new_tracer = require("opentracing.tracer").new
local zipkin_codec = require("apisix.plugins.zipkin.codec")
local new_random_sampler = require("apisix.plugins.zipkin.random_sampler").new
local new_reporter = require("apisix.plugins.zipkin.reporter").new

local plugin_name = "zipkin"


-- You can follow this document to write schema:
-- https://github.com/Tencent/rapidjson/blob/master/bin/draft-04/schema
-- rapidjson not supported `format` in draft-04 yet
local schema = {
type = "object",
properties = {
endpoint = {type = "string"},
sample_ratio = {type = "number",
default = 0.001, minimum = 0.00001, maximum = 1}
},
required = {"endpoint"}
}


local _M = {
version = 0.1,
priority = -1000, -- last running plugin
name = plugin_name,
schema = schema,
}


function _M.check_schema(conf)
local ok, err = core.schema.check(schema, conf)

if not ok then
return false, err
end

return true
end


local function create_tracer(conf)
local tracer = new_tracer(new_reporter(conf), new_random_sampler(conf))
tracer:register_injector("http_headers", zipkin_codec.new_injector())
tracer:register_extractor("http_headers", zipkin_codec.new_extractor())
return tracer
end

local function report2endpoint(premature, reporter)
if premature then
return
end

local ok, err = reporter:flush()
if not ok then
core.log.error("reporter flush ", err)
return
end
end


function _M.rewrite(conf, ctx)
local tracer = core.lrucache.plugin_ctx(plugin_name, ctx,
create_tracer, conf)

local wire_context = tracer:extract("http_headers", core.request.headers(ctx))

local start_timestamp = ngx.req.start_time()
local request_span = tracer:start_span("kong.request", {
child_of = wire_context,
start_timestamp = start_timestamp,
tags = {
component = "apisix",
["span.kind"] = "server",
["http.method"] = ctx.var.method,
["http.url"] = ctx.var.request_uri,
["peer.ipv4"] = core.request.get_remote_client_ip(ctx), -- TODO: support ipv6
["peer.port"] = core.request.get_remote_client_port(ctx),
}
})

local rewrite_span = ctx.opentracing.rewrite_span:start_child_span(
"apisix.rewrite", start_timestamp)

ctx.opentracing = {
tracer = tracer,
wire_context = wire_context,
request_span = request_span,
rewrite_span = rewrite_span,
access_span = nil,
proxy_span = nil,
header_filter_span = nil,
header_filter_finished = false,
body_filter_span = nil,
}

ctx.REWRITE_END_TIME = tracer:time()
ctx.opentracing.rewrite_span:finish(ctx.REWRITE_END_TIME)
end

function _M.access(conf, ctx)
local opentracing = ctx.opentracing

opentracing.proxy_span = ctx.opentracing.access_span:start_child_span(
"apisix.access", ctx.KONG_ACCESS_START / 1000)

-- Want to send headers to upstream
local outgoing_headers = {}
opentracing.tracer:inject(opentracing.proxy_span, "http_headers", outgoing_headers)
local set_header = kong.service.request.set_header
for k, v in pairs(outgoing_headers) do
set_header(k, v)
end

end


function _M.log(conf, ctx)
local tracer = ctx.opentracing.tracer
local reporter = tracer.reporter
local ok, err = ngx.timer.at(0, report2endpoint, reporter)
if not ok then
core.log.error("failed to create timer: ", err)
end
end

return _M
101 changes: 101 additions & 0 deletions lua/apisix/plugins/zipkin/codec.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
local core = require("apisix.core")
local to_hex = require "resty.string".to_hex
local new_span_context = require("opentracing.span_context").new

local function hex_to_char(c)
return string.char(tonumber(c, 16))
end

local function from_hex(str)
if str ~= nil then -- allow nil to pass through
str = str:gsub("%x%x", hex_to_char)
end
return str
end

local function new_extractor()
return function(headers)
-- X-B3-Sampled: if an upstream decided to sample this request, we do too.
local sample = headers["x-b3-sampled"]
if sample == "1" or sample == "true" then
sample = true
elseif sample == "0" or sample == "false" then
sample = false
elseif sample ~= nil then
core.log.warn("x-b3-sampled header invalid; ignoring.")
sample = nil
end

-- X-B3-Flags: if it equals '1' then it overrides sampling policy
-- We still want to warn on invalid sample header, so do this after the above
local debug = headers["x-b3-flags"]
if debug == "1" then
sample = true
elseif debug ~= nil then
core.log.warn("x-b3-flags header invalid; ignoring.")
end

local had_invalid_id = false

local trace_id = headers["x-b3-traceid"]
-- Validate trace id
if trace_id and ((#trace_id ~= 16 and #trace_id ~= 32) or trace_id:match("%X")) then
core.log.warn("x-b3-traceid header invalid; ignoring.")
had_invalid_id = true
end

local parent_span_id = headers["x-b3-parentspanid"]
-- Validate parent_span_id
if parent_span_id and (#parent_span_id ~= 16 or parent_span_id:match("%X")) then
core.log.warn("x-b3-parentspanid header invalid; ignoring.")
had_invalid_id = true
end

local request_span_id = headers["x-b3-spanid"]
-- Validate request_span_id
if request_span_id and (#request_span_id ~= 16 or request_span_id:match("%X")) then
core.log.warn("x-b3-spanid header invalid; ignoring.")
had_invalid_id = true
end

if trace_id == nil or had_invalid_id then
return nil
end

-- Process jaegar baggage header
local baggage = {}
for k, v in pairs(headers) do
local baggage_key = k:match("^uberctx%-(.*)$")
if baggage_key then
baggage[baggage_key] = ngx.unescape_uri(v)
end
end

trace_id = from_hex(trace_id)
parent_span_id = from_hex(parent_span_id)
request_span_id = from_hex(request_span_id)

return new_span_context(trace_id, request_span_id, parent_span_id, sample, baggage)
end
end

local function new_injector()
return function(span_context, headers)
-- We want to remove headers if already present
headers["x-b3-traceid"] = to_hex(span_context.trace_id)
headers["x-b3-parentspanid"] = span_context.parent_id and to_hex(span_context.parent_id) or nil
headers["x-b3-spanid"] = to_hex(span_context.span_id)
local Flags = core.request.header(nil, "x-b3-flags") -- Get from request headers
headers["x-b3-flags"] = Flags
headers["x-b3-sampled"] = (not Flags) and (span_context.should_sample and "1" or "0") or nil
for key, value in span_context:each_baggage_item() do
-- XXX: https://github.com/opentracing/specification/issues/117
headers["uberctx-"..key] = ngx.escape_uri(value)
end
end
end

return {
new_extractor = new_extractor,
new_injector = new_injector,
}
Loading

0 comments on commit bb2acce

Please sign in to comment.