Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Lua balancer #2472

Merged
merged 2 commits into from
May 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 20 additions & 20 deletions internal/file/bindata.go

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion internal/ingress/controller/template/template.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,6 @@ func buildLuaSharedDictionaries(s interface{}, dynamicConfigurationEnabled bool,
if dynamicConfigurationEnabled {
out = append(out,
"lua_shared_dict configuration_data 5M",
"lua_shared_dict round_robin_state 1M",
"lua_shared_dict locks 512k",
"lua_shared_dict balancer_ewma 1M",
"lua_shared_dict balancer_ewma_last_touched_at 1M",
Expand Down
68 changes: 23 additions & 45 deletions rootfs/etc/nginx/lua/balancer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -3,25 +3,21 @@ local json = require("cjson")
local configuration = require("configuration")
local util = require("util")
local lrucache = require("resty.lrucache")
local resty_lock = require("resty.lock")
local ewma = require("balancer.ewma")
local sticky = require("sticky")
local chash = require("balancer.chash")
local resty_balancer = require("balancer.resty")

-- measured in seconds
-- for an Nginx worker to pick up the new list of upstream peers
-- it will take <the delay until controller POSTed the backend object to the Nginx endpoint> + BACKENDS_SYNC_INTERVAL
local BACKENDS_SYNC_INTERVAL = 1

local ROUND_ROBIN_LOCK_KEY = "round_robin"
local DEFAULT_LB_ALG = "round_robin"

local round_robin_state = ngx.shared.round_robin_state

local _M = {}

local round_robin_lock = resty_lock:new("locks", {timeout = 0, exptime = 0.1})

-- TODO(elvinefendi) we can probably avoid storing all backends here. We already store them in their respective
-- load balancer implementations
local backends, err = lrucache.new(1024)
if not backends then
return error("failed to create the cache for backends: " .. (err or "unknown"))
Expand Down Expand Up @@ -64,65 +60,49 @@ local function balance()
end

if backend["upstream-hash-by"] then
local endpoint = chash.balance(backend)
local endpoint = resty_balancer.balance(backend)
if not endpoint then
return nil, nil
end

return endpoint.address, endpoint.port
end

if lb_alg == "ip_hash" then
-- TODO(elvinefendi) implement me
return backend.endpoints[0].address, backend.endpoints[0].port
elseif lb_alg == "ewma" then
if lb_alg == "ewma" then
local endpoint = ewma.balance(backend.endpoints)
return endpoint.address, endpoint.port
end

if lb_alg ~= DEFAULT_LB_ALG then
ngx.log(ngx.WARN, tostring(lb_alg) .. " is not supported, falling back to " .. DEFAULT_LB_ALG)
ngx.log(ngx.WARN, string.format("%s is not supported, falling back to %s", tostring(lb_alg), DEFAULT_LB_ALG))
end

-- Round-Robin
round_robin_lock:lock(backend.name .. ROUND_ROBIN_LOCK_KEY)
local last_index = round_robin_state:get(backend.name)
local index, endpoint = next(backend.endpoints, last_index)
if not index then
index = 1
endpoint = backend.endpoints[index]
end
local success, forcible
success, err, forcible = round_robin_state:set(backend.name, index)
if not success then
ngx.log(ngx.WARN, "round_robin_state:set failed " .. err)
end
if forcible then
ngx.log(ngx.WARN, "round_robin_state:set valid items forcibly overwritten")
local endpoint = resty_balancer.balance(backend)
if not endpoint then
return nil, nil
end

if is_sticky then
sticky.set_endpoint(endpoint, backend)
end
round_robin_lock:unlock(backend.name .. ROUND_ROBIN_LOCK_KEY)

return endpoint.address, endpoint.port
end

local function sync_backend(backend)
backends:set(backend.name, backend)

-- also reset the respective balancer state since backend has changed
round_robin_state:delete(backend.name)
local lb_alg = backend["load-balance"] or DEFAULT_LB_ALG

if backend["upstream-hash-by"] or lb_alg == "round_robin" then
resty_balancer.reinit(backend)
end

-- TODO: Reset state of EWMA per backend
local lb_alg = backend["load-balance"] or DEFAULT_LB_ALG
if lb_alg == "ewma" then
ngx.shared.balancer_ewma:flush_all()
ngx.shared.balancer_ewma_last_touched_at:flush_all()
end

-- reset chash for this backend
if backend["upstream-hash-by"] then
chash.reinit(backend)
end

ngx.log(ngx.INFO, "syncronization completed for: " .. backend.name)
end

local function sync_backends()
Expand Down Expand Up @@ -161,7 +141,7 @@ end
function _M.init_worker()
_, err = ngx.timer.every(BACKENDS_SYNC_INTERVAL, sync_backends)
if err then
ngx.log(ngx.ERR, "error when setting up timer.every for sync_backends: " .. tostring(err))
ngx.log(ngx.ERR, string.format("error when setting up timer.every for sync_backends: %s", tostring(err)))
end
end

Expand All @@ -186,12 +166,10 @@ function _M.call()
local ok
ok, err = ngx_balancer.set_current_peer(host, port)
if ok then
ngx.log(
ngx.INFO,
"current peer is set to " .. host .. ":" .. port .. " using lb_alg " .. tostring(get_current_lb_alg())
)
ngx.log(ngx.INFO,
string.format("current peer is set to %s:%s using lb_alg %s", host, port, tostring(get_current_lb_alg())))
else
ngx.log(ngx.ERR, "error while setting current upstream peer to: " .. tostring(err))
ngx.log(ngx.ERR, string.format("error while setting current upstream peer to %s", tostring(err)))
end
end

Expand Down
48 changes: 0 additions & 48 deletions rootfs/etc/nginx/lua/balancer/chash.lua

This file was deleted.

69 changes: 69 additions & 0 deletions rootfs/etc/nginx/lua/balancer/resty.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
local resty_roundrobin = require("resty.roundrobin")
local resty_chash = require("resty.chash")
local util = require("util")

local _M = {}
local instances = {}

local function get_resty_balancer_nodes(endpoints)
local nodes = {}
local weight = 1

for _, endpoint in pairs(endpoints) do
local endpoint_string = endpoint.address .. ":" .. endpoint.port
nodes[endpoint_string] = weight
end

return nodes
end

local function init_resty_balancer(factory, instance, endpoints)
local nodes = get_resty_balancer_nodes(endpoints)

if instance then
instance:reinit(nodes)
else
instance = factory:new(nodes)
end

return instance
end

function _M.balance(backend)
local instance = instances[backend.name]
if not instance then
ngx.log(ngx.ERR, "no LB algorithm instance was found")
return nil
end

local endpoint_string
if backend["upstream-hash-by"] then
local key = util.lua_ngx_var(backend["upstream-hash-by"])
endpoint_string = instance:find(key)
else
endpoint_string = instance:find()
end

local address, port = util.split_pair(endpoint_string, ":")
return { address = address, port = port }
end

function _M.reinit(backend)
local instance = instances[backend.name]
local factory = resty_roundrobin
if backend["upstream-hash-by"] then
factory = resty_chash
end

if instance then
local mt = getmetatable(instance)
if mt.__index ~= factory then
ngx.log(ngx.INFO, "LB algorithm has been changed, resetting the instance")
instance = nil
end
end

instances[backend.name] = init_resty_balancer(factory, instance, backend.endpoints)
end

return _M
17 changes: 7 additions & 10 deletions rootfs/etc/nginx/lua/test/balancer_test.lua
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ local function init()
mock_sticky = {}
mock_ngx_balancer = {}
mock_ewma = {}
mock_chash = {}
mock_resty_balancer = {}
mock_backends = dict_generator(default_backends)
mock_lrucache = {
new = function () return mock_backends end
Expand All @@ -63,7 +63,6 @@ local function init()
}
_G.ngx = {
shared = {
round_robin_state = dict_generator({}),
balancer_ewma = dict_generator({}),
balancer_ewma_last_touched_at = dict_generator({}),
},
Expand All @@ -79,7 +78,7 @@ local function init()
package.loaded["cjson"] = mock_cjson
package.loaded["resty.lock"] = mock_lock
package.loaded["balancer.ewma"] = mock_ewma
package.loaded["balancer.chash"] = mock_chash
package.loaded["balancer.resty"] = mock_resty_balancer
package.loaded["configuration"] = mock_config
package.loaded["sticky"] = mock_sticky
balancer = require("balancer")
Expand All @@ -106,7 +105,6 @@ describe("[balancer_test]", function()

before_each(function()
_G.ngx.get_phase = nil
_G.ngx.shared.round_robin_state._vals = {}
_G.ngx.var = {}
mock_backends._vals = default_backends
mock_sticky.is_sticky = function(b) return false end
Expand Down Expand Up @@ -148,6 +146,7 @@ describe("[balancer_test]", function()
local set_more_tries_spy = spy.on(mock_ngx_balancer, "set_more_tries")
local set_current_peer_spy = spy.on(mock_ngx_balancer, "set_current_peer")

mock_resty_balancer.balance = function(b) return {address = "000.000.000", port = "8080"} end
assert.has_no_errors(balancer.call)
assert.spy(backend_get_spy).was_called_with(match.is_table(), "mock_rr_backend")
assert.spy(set_more_tries_spy).was_called_with(1)
Expand All @@ -157,6 +156,7 @@ describe("[balancer_test]", function()
mock_ngx_balancer.set_more_tries:clear()
mock_ngx_balancer.set_current_peer:clear()

mock_resty_balancer.balance = function(b) return {address = "000.000.001", port = "8081"} end
assert.has_no_errors(balancer.call)
assert.spy(backend_get_spy).was_called_with(match.is_table(), "mock_rr_backend")
assert.spy(set_more_tries_spy).was_called_with(1)
Expand Down Expand Up @@ -205,6 +205,7 @@ describe("[balancer_test]", function()
local set_more_tries_spy = spy.on(mock_ngx_balancer, "set_more_tries")
local set_current_peer_spy = spy.on(mock_ngx_balancer, "set_current_peer")

mock_resty_balancer.balance = function(b) return {address = "000.000.000", port = "8080"} end
assert.has_no_errors(balancer.call)
assert.spy(backend_get_spy).was_called_with(match.is_table(), "mock_rr_backend")
assert.spy(set_more_tries_spy).was_called_with(1)
Expand Down Expand Up @@ -261,35 +262,31 @@ describe("[balancer_test]", function()
it("lb_alg=round_robin, updates backend when sync is required", function()
mock_config.get_backends_data = function() return { default_backends.mock_rr_backend } end
mock_backends._vals = {}
_G.ngx.shared.round_robin_state._vals = default_backends.mock_rr_backend

local backend_set_spy = spy.on(mock_backends, "set")
local rr_delete_spy = spy.on(_G.ngx.shared.round_robin_state, "delete")
local ewma_flush_spy = spy.on(_G.ngx.shared.balancer_ewma, "flush_all")
local ewma_lta_flush_spy = spy.on(_G.ngx.shared.balancer_ewma_last_touched_at, "flush_all")

mock_resty_balancer.balance = function(b) return {address = "000.000.000", port = "8080"} end
mock_resty_balancer.reinit = function(b) return end
assert.has_no_errors(balancer.init_worker)
assert.spy(backend_set_spy)
.was_called_with(match.is_table(), default_backends.mock_rr_backend.name, match.is_table())
assert.spy(rr_delete_spy).was_called_with(match.is_table(), default_backends.mock_rr_backend.name)
assert.spy(ewma_flush_spy).was_not_called()
assert.spy(ewma_lta_flush_spy).was_not_called()
end)

it("lb_alg=ewma, updates backend when sync is required", function()
mock_config.get_backends_data = function() return { default_backends.mock_ewma_backend } end
mock_backends._vals = {}
_G.ngx.shared.round_robin_state._vals = default_backends.mock_ewma_backend

local backend_set_spy = spy.on(mock_backends, "set")
local rr_delete_spy = spy.on(_G.ngx.shared.round_robin_state, "delete")
local ewma_flush_spy = spy.on(_G.ngx.shared.balancer_ewma, "flush_all")
local ewma_lta_flush_spy = spy.on(_G.ngx.shared.balancer_ewma_last_touched_at, "flush_all")

assert.has_no_errors(balancer.init_worker)
assert.spy(backend_set_spy)
.was_called_with(match.is_table(), default_backends.mock_ewma_backend.name, match.is_table())
assert.spy(rr_delete_spy).was_called_with(match.is_table(), default_backends.mock_ewma_backend.name)
assert.spy(ewma_flush_spy).was_called_with(match.is_table())
assert.spy(ewma_lta_flush_spy).was_called_with(match.is_table())
end)
Expand Down
8 changes: 8 additions & 0 deletions rootfs/etc/nginx/lua/util.lua
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
local _M = {}
local string_len = string.len
local string_sub = string.sub

-- given an Nginx variable i.e $request_uri
-- it returns value of ngx.var[request_uri]
function _M.lua_ngx_var(ngx_var)
local var_name = string_sub(ngx_var, 2)
return ngx.var[var_name]
end

function _M.split_pair(pair, seperator)
local i = pair:find(seperator)
Expand Down