Skip to content

Commit

Permalink
[proxy] optionally drop synchronous post_action
Browse files Browse the repository at this point in the history
doing synchronous call in the post_action will
push the latency to the client on the same connection

using timers we can make that fully asynchronous
not being tied to the request phase at all

setting APICAST_REPORTING_THREADS environment variable
to a number greater than 0 will enable reporting in asynchronous
timers instead of synchronous call

the number defines a maximum number of parallel calls
to backend per worker

if there is new report coming but no connections is available
APIcast will wait for up to 10 seconds before falling back to the
synchronous call

please note this is **experimental** feature
  • Loading branch information
mikz committed Mar 14, 2017
1 parent d92cf03 commit 363d5aa
Show file tree
Hide file tree
Showing 9 changed files with 351 additions and 44 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ and this project adheres to [Semantic Versioning](http://semver.org/).
- Backend HTTP client that uses cosockets [PR #295](https://github.com/3scale/apicast/pull/295)
- Ability to customize main section of nginx configuration (and expose more env variables) [PR #292](https://github.com/3scale/apicast/pull/292)
- Ability to lock service to specific configuration version [PR #293](https://github.com/3scale/apicast/pull/292)
- Experimental option for true out of band reporting (`APICAST_REPORTING_WORKERS`) [PR #290](https://github.com/3scale/apicast/pull/290)
- Experimental option for true out of band reporting (`APICAST_REPORTING_THREADS`) [PR #290](https://github.com/3scale/apicast/pull/290)
- `/status/info` endpoint to the Management API [PR #290](https://github.com/3scale/apicast/pull/290)

### Removed
Expand Down
7 changes: 4 additions & 3 deletions apicast/conf.d/apicast.conf
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ location = /threescale_authrep {
local log = ngx.var.arg_log;
if log then return '&' .. ngx.unescape_uri(log) end
}
set_by_lua_block $backend_endpoint { return ngx.ctx.backend_endpoint }
set $path /transactions/authrep.xml?$backend_authentication_type=$backend_authentication_value&service_id=$service_id&$usage&$credentials$log;
proxy_pass_request_headers off;
proxy_http_version 1.1;
Expand All @@ -31,7 +32,6 @@ location = /threescale_authrep {
}
}


location @out_of_band_authrep_action {
internal;

Expand All @@ -43,11 +43,13 @@ location @out_of_band_authrep_action {

log_by_lua_block {
ngx.var.post_action_impact = ngx.var.request_time - ngx.var.original_request_time
ngx.log(ngx.INFO, '[authrep] ', ngx.var.request_uri, ' ', ngx.var.status)
require('module'):log()
}
}

set $backend_endpoint 'http://127.0.0.1:8081';


location / {
set $cached_key null;
set $credentials null;
Expand All @@ -62,7 +64,6 @@ location / {
set $redirect_url null;

set $backend_host 'backend';
set $backend_endpoint 'http://127.0.0.1:8081';
set $backend_authentication_type null;
set $backend_authentication_value null;
set $version null;
Expand Down
2 changes: 2 additions & 0 deletions apicast/conf/nginx.conf
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ env BACKEND_ENDPOINT_OVERRIDE;

include ../main.d/*.conf;

env APICAST_REPORTING_THREADS;

error_log /dev/null emerg;

events {
Expand Down
18 changes: 13 additions & 5 deletions apicast/src/apicast.lua
Original file line number Diff line number Diff line change
Expand Up @@ -65,18 +65,26 @@ end

function _M.post_action()
local request_id = ngx.var.original_request_id
local p = post_action_proxy[request_id]
post_action_proxy[request_id] = nil
p:post_action()
local p = ngx.ctx.proxy or post_action_proxy[request_id]

if p then
p:post_action()
else
ngx.log(ngx.WARN, 'could not find proxy for request id: ', request_id)
end
end

function _M.access()
local p = ngx.ctx.proxy
local fun = p:call() -- proxy:access() or oauth handler
local request_id = ngx.var.request_id
local fun = p:call() -- proxy:access() or oauth handler

local ok, err = fun()

post_action_proxy[request_id] = p
ngx.var.original_request_id = request_id
return fun()

return ok, err
end

_M.body_filter = noop
Expand Down
10 changes: 8 additions & 2 deletions apicast/src/backend_client.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

local setmetatable = setmetatable
local concat = table.concat
local insert = table.insert
local len = string.len

local http_ng = require('resty.http_ng')
local user_agent = require('user_agent')
Expand Down Expand Up @@ -83,11 +85,15 @@ local function call_backend_transaction(self, path, ...)

local args = { self.authentication, ... }

local query = {}
for i=1, #args do
args[i] = ngx.encode_args(args[i])
local arg = ngx.encode_args(args[i])
if len(arg) > 0 then
insert(query, arg)
end
end

local url = resty_url.join(endpoint, '/transactions/', path, '?', concat(args, '&'))
local url = resty_url.join(endpoint, '/transactions/', path, '?', concat(query, '&'))

local res = http_client.get(url)

Expand Down
121 changes: 91 additions & 30 deletions apicast/src/proxy.lua
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
local env = require 'resty.env'
local custom_config = env.get('APICAST_CUSTOM_CONFIG')
local configuration_store = require 'configuration_store'
local util = require('util')

local oauth = require 'oauth'
local resty_url = require 'resty.url'
Expand All @@ -14,13 +15,18 @@ local concat = table.concat
local gsub = string.gsub
local tonumber = tonumber
local setmetatable = setmetatable
local exit = ngx.exit
local encode_args = ngx.encode_args
local resty_resolver = require 'resty.resolver'
local semaphore = require('ngx.semaphore')
local backend_client = require('backend_client')
local timers = semaphore.new(tonumber(env.get('APICAST_REPORTING_THREADS') or 0))

local empty = {}

local response_codes = env.enabled('APICAST_RESPONSE_CODES')

local post_action_needed = response_codes or timers:count() < 1

local _M = { }

local mt = {
Expand Down Expand Up @@ -134,7 +140,7 @@ local http = {
local backend_upstream = ngx.ctx.backend_upstream
local previous_real_url = ngx.var.real_url
ngx.log(ngx.DEBUG, '[ctx] copying backend_upstream of size: ', #backend_upstream)
local res = ngx.location.capture(assert(url), { share_all_vars = true, ctx = { backend_upstream = backend_upstream } })
local res = ngx.location.capture(assert(url), { share_all_vars = true, ctx = { backend_upstream = backend_upstream, backend_endpoint = ngx.var.backend_endpoint } })

local real_url = ngx.var.real_url

Expand All @@ -150,7 +156,7 @@ local http = {
end
}

local function oauth_authrep(service)
local function oauth_authrep(proxy, service)
local cached_key = ngx.var.cached_key .. ":" .. ngx.var.usage
local access_tokens = assert(ngx.shared.api_keys, 'missing shared dictionary: api_keys')
local is_known = access_tokens:get(cached_key)
Expand All @@ -159,6 +165,7 @@ local function oauth_authrep(service)
ngx.log(ngx.DEBUG, 'apicast cache hit key: ', cached_key)
ngx.var.cached_key = cached_key
else
proxy:set_backend_upstream(service)
local res = http.get("/threescale_oauth_authrep")

if res.status ~= 200 then
Expand All @@ -174,7 +181,7 @@ local function oauth_authrep(service)
end
end

local function authrep(service)
local function authrep(proxy, service)
-- NYI: return to lower frame
local cached_key = ngx.var.cached_key .. ":" .. ngx.var.usage
local api_keys = ngx.shared.api_keys
Expand All @@ -184,6 +191,8 @@ local function authrep(service)
ngx.log(ngx.DEBUG, 'apicast cache hit key: ', cached_key)
ngx.var.cached_key = cached_key
else

proxy:set_backend_upstream(service)
ngx.log(ngx.INFO, 'apicast cache miss key: ', cached_key)
local res = http.get("/threescale_authrep")

Expand All @@ -206,11 +215,15 @@ local function authrep(service)
end
end

function _M.authorize(backend_version, service)
function _M:authorize(backend_version, service)
if backend_version == 'oauth' then
oauth_authrep(service)
oauth_authrep(self, service)
else
authrep(service)
authrep(self, service)
end

if not post_action_needed then
self:post_action(true)
end
end

Expand Down Expand Up @@ -276,8 +289,6 @@ function _M:call(host)
host = host or ngx.var.host
local service = ngx.ctx.service or self:set_service(host)

self:set_backend_upstream(service)

if service.backend_version == 'oauth' then
local f, params = oauth.call()

Expand Down Expand Up @@ -349,25 +360,86 @@ function _M:access(service)
ngx.header["X-3scale-hostname"] = ngx.var.hostname
end

self.authorize(backend_version, service)
self:authorize(backend_version, service)
end


local function response_codes_encoded_data()
local function response_codes_data()
local params = {}

if not response_codes then
return ''
return params
end

if response_codes then
params["log[code]"] = ngx.var.status
end

return ngx.escape_uri(ngx.encode_args(params))
return params
end

local function response_codes_encoded_data()
return ngx.escape_uri(ngx.encode_args(response_codes_data()))
end

local function handle_post_action_response(cached_key, res)
if res.ok == false or res.status ~= 200 then
local api_keys = ngx.shared.api_keys

if api_keys then
ngx.log(ngx.NOTICE, 'apicast cache delete key: ', cached_key, ' cause status ', res.status)
api_keys:delete(cached_key)
else
ngx.log(ngx.ALERT, 'apicast cache error missing shared memory zone api_keys')
end

ngx.log(ngx.ERR, 'http_client error: ', res.error, ' status: ', res.status)
end
end

local function post_action(_, cached_key, backend, ...)
local res = util.timer('backend post_action', backend.authrep, backend, ...)

handle_post_action_response(cached_key, res)

if not post_action_needed then
timers:post(1)
end
end

local function capture_post_action(self, cached_key, service)
self:set_backend_upstream(service)

local auth_uri = service.backend_version == 'oauth' and 'threescale_oauth_authrep' or 'threescale_authrep'
local res = http.get("/".. auth_uri .."?log=" .. response_codes_encoded_data())

handle_post_action_response(cached_key, res)
end

function _M:post_action()
local function timer_post_action(self, cached_key, service)
local backend = assert(backend_client:new(service), 'missing backend')

local ok, err

if post_action_needed then
ok = true
else
ok, err = timers:wait(10)
end

if ok then
-- TODO: try to do this in different phase and use semaphore to limit number of background threads
-- TODO: Also it is possible to use sets in shared memory to enqueue work
ngx.timer.at(0, post_action, cached_key, backend, ngx.ctx.usage, ngx.ctx.credentials, response_codes_data())
else
ngx.log(ngx.ERR, 'failed to acquire timer: ', err)
return capture_post_action(self, cached_key, service)
end
end

function _M:post_action(force)
if not post_action_needed and not force then
return nil, 'post action not needed'
end

local cached_key = ngx.var.cached_key

Expand All @@ -376,26 +448,15 @@ function _M:post_action()

local service_id = ngx.var.service_id
local service = ngx.ctx.service or self.configuration:find_by_id(service_id)
self:set_backend_upstream(service)

local auth_uri = service.backend_version == 'oauth' and 'threescale_oauth_authrep' or 'threescale_authrep'
local res = http.get("/".. auth_uri .."?log=" .. response_codes_encoded_data())

if res.status ~= 200 then
local api_keys = ngx.shared.api_keys

if api_keys then
ngx.log(ngx.NOTICE, 'apicast cache delete key: ', cached_key, ' cause status ', res.status)
api_keys:delete(cached_key)
else
ngx.log(ngx.ALERT, 'apicast cache error missing shared memory zone api_keys')
end
if post_action_needed then
capture_post_action(self, cached_key, service)
else
timer_post_action(self, cached_key, service)
end
else
ngx.log(ngx.INFO, '[async] skipping after action, no cached key')
end

exit(ngx.HTTP_OK)
end

if custom_config then
Expand Down
13 changes: 12 additions & 1 deletion doc/parameters.md
Original file line number Diff line number Diff line change
Expand Up @@ -131,4 +131,15 @@ You should enable the debug level only for debugging.

Replace `${ID}` with the actual Service ID. The value should be the configuration version you can see in the configuration history on the Admin Portal.

Setting it to particual version will make it not auto-update and always use that version.
Setting it to particual version will make it not auto-update and always use that version.

### `APICAST_REPORTING_THREADS`

**Default**: 0
**Value:**: integer >= 0

Value greater than 0 is going to enable out-of-band reporting to backend.
This is new **experimental** feature for increasing performance. Client
Won't see the backend latency and everything will be processed asynchronously.
This value determines how many asynchronous reports can be running simultainesly
before client starts being throttled by adding latency.
4 changes: 2 additions & 2 deletions t/003-apicast.t
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ GET /?user_key=value
--- response_body chomp
no mapping rules!
--- error_code: 412
--- error_log
skipping after action, no cached key
=== TEST 3: authentication credentials invalid
The message is configurable and default status is 403.
Expand Down Expand Up @@ -469,6 +467,7 @@ env RESOLVER=127.0.0.1:1953;
--- http_config
include $TEST_NGINX_UPSTREAM_CONFIG;
lua_package_path "$TEST_NGINX_LUA_PATH";
lua_shared_dict api_keys 1m;
init_by_lua_block {
require('configuration_loader').mock({
services = {
Expand Down Expand Up @@ -502,6 +501,7 @@ env RESOLVER=127.0.0.1:1953;
if ngx.var.host == 'localhost.example.com' then
ngx.exit(200)
else
ngx.log(ngx.ERR, 'invalid host: ', ngx.var.host)
ngx.exit(404)
end
}
Expand Down
Loading

0 comments on commit 363d5aa

Please sign in to comment.