Skip to content

Commit

Permalink
fix(healthcheck) single worker actively checks the status (#67)
Browse files Browse the repository at this point in the history
  • Loading branch information
locao authored Feb 9, 2021
1 parent 9e5561e commit 45fc129
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 55 deletions.
121 changes: 83 additions & 38 deletions lib/resty/healthcheck.lua
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ local resty_lock = require ("resty.lock")
local re_find = ngx.re.find
local bit = require("bit")
local ngx_now = ngx.now
local ngx_worker_id = ngx.worker.id
local ngx_worker_pid = ngx.worker.pid
local ssl = require("ngx.ssl")
local resty_timer = require "resty.timer"

Expand All @@ -51,6 +53,17 @@ local EMPTY = setmetatable({},{
})


--- timer constants
-- evaluate active checks every 0.1s
local CHECK_INTERVAL = 0.1
-- use a 10% jitter to start each worker timer
local CHECK_JITTER = CHECK_INTERVAL * 0.1
-- lock valid period: the worker which acquires the lock owns it for 15 times
-- the check interval. If it does not update the shm during this period, we
-- consider that it is not able to continue checking (the worker probably was killed)
local LOCK_PERIOD = CHECK_INTERVAL * 15


-- Counters: a 32-bit shm integer can hold up to four 8-bit counters.
local CTR_SUCCESS = 0x00000001
local CTR_HTTP = 0x00000100
Expand Down Expand Up @@ -129,7 +142,7 @@ end
-- Some color for demo purposes
local use_color = false
local id = function(x) return x end
local worker_color = use_color and function(str) return ("\027["..tostring(31 + ngx.worker.pid() % 5).."m"..str.."\027[0m") end or id
local worker_color = use_color and function(str) return ("\027["..tostring(31 + ngx_worker_pid() % 5).."m"..str.."\027[0m") end or id

-- Debug function
local function dump(...) print(require("pl.pretty").write({...})) end -- luacheck: ignore 211
Expand Down Expand Up @@ -949,31 +962,45 @@ end
-- results of the checks.


-- @param health_mode either "healthy" or "unhealthy" to indicate what
-- lock to get.
-- @return `true` on success, or false if the lock was not acquired, or `nil + error`
-- in case of errors
function checker:get_periodic_lock(health_mode)
local key = self.PERIODIC_LOCK .. health_mode
local interval = self.checks.active[health_mode].interval

-- The lock is held for the whole interval to prevent multiple
-- worker processes from sending the test request simultaneously.
-- UNLESS: the probing takes longer than the timer interval.
-- Here we substract the lock expiration time by 1ms to prevent
-- a race condition with the next timer event.
local ok, err = self.shm:add(key, true, interval - 0.001)
if not ok then
if err == "exists" then
return false
function checker:get_periodic_lock()
local key = self.PERIODIC_LOCK
local my_pid = ngx_worker_pid()
local checker_pid = self.shm:get(key)

if checker_pid == nil then
-- no worker is checking, try to acquire the lock
local ok, err = self.shm:add(key, my_pid, LOCK_PERIOD)
if not ok then
if err == "exists" then
-- another worker got the lock before
return false
end
self:log(ERR, "failed to add key '", key, "': ", err)
return nil, err
end
self:log(ERR, "failed to add key '", key, "': ", err)
return nil
elseif checker_pid ~= my_pid then
-- another worker is checking
return false
end

return true
end


-- touch the shm to refresh the valid period
function checker:renew_periodic_lock()
local key = self.PERIODIC_LOCK
local my_pid = ngx_worker_pid()

local _, err = self.shm:set(key, my_pid, LOCK_PERIOD)
if err then
self:log(ERR, "failed to update key '", key, "': ", err)
end
end


--- Active health check callback function.
-- @param self the checker object this timer runs on
-- @param health_mode either "healthy" or "unhealthy" to indicate what check
Expand Down Expand Up @@ -1372,7 +1399,7 @@ function _M.new(opts)
self.TARGET_LIST = SHM_PREFIX .. self.name .. ":target_list"
self.TARGET_LIST_LOCK = SHM_PREFIX .. self.name .. ":target_list_lock"
self.TARGET_LOCK = SHM_PREFIX .. self.name .. ":target_lock"
self.PERIODIC_LOCK = SHM_PREFIX .. self.name .. ":period_lock:"
self.PERIODIC_LOCK = SHM_PREFIX .. ":period_lock:"
-- prepare constants
self.EVENT_SOURCE = EVENT_SOURCE_PREFIX .. " [" .. self.name .. "]"
self.LOG_PREFIX = LOG_PREFIX .. "(" .. self.name .. ") "
Expand Down Expand Up @@ -1427,33 +1454,51 @@ function _M.new(opts)
if (self.checks.active.healthy.active or self.checks.active.unhealthy.active)
and active_check_timer == nil then

self:log(DEBUG, "starting active check timer")
self:log(DEBUG, "starting timer to check active checks")
local err
active_check_timer, err = resty_timer({
local check_healthcheck_timer, err = resty_timer({
recurring = true,
interval = 0.1, -- evaluate active checks every 0.1s
-- check if no worker is actively checking health status every 10 check
-- periods
interval = CHECK_INTERVAL * 10,
jitter = CHECK_JITTER,
detached = false,
expire = function()
local cur_time = ngx_now()
for _, checker_obj in ipairs(hcs) do
if checker_obj.checks.active.healthy.active and
(checker_obj.checks.active.healthy.last_run +
checker_obj.checks.active.healthy.interval <= cur_time)
then
checker_callback(checker_obj, "healthy")
end

if checker_obj.checks.active.unhealthy.active and
(checker_obj.checks.active.unhealthy.last_run +
checker_obj.checks.active.unhealthy.interval <= cur_time)
then
checker_callback(checker_obj, "unhealthy")
if self:get_periodic_lock() and not active_check_timer then
self:log(DEBUG, "worker ", ngx_worker_id(), " (pid: ", ngx_worker_pid(), ") ",
"starting active check timer")
active_check_timer, err = resty_timer({
recurring = true,
interval = CHECK_INTERVAL,
detached = false,
expire = function()
self:renew_periodic_lock()
local cur_time = ngx_now()
for _, checker_obj in ipairs(hcs) do
if checker_obj.checks.active.healthy.active and
(checker_obj.checks.active.healthy.last_run +
checker_obj.checks.active.healthy.interval <= cur_time)
then
checker_callback(checker_obj, "healthy")
end

if checker_obj.checks.active.unhealthy.active and
(checker_obj.checks.active.unhealthy.last_run +
checker_obj.checks.active.unhealthy.interval <= cur_time)
then
checker_callback(checker_obj, "unhealthy")
end
end
end,
})
if not active_check_timer then
self:log(ERR, "Could not start active check timer: ", err)
end
end
end,
})
if not active_check_timer then
self:log(ERR, "Could not start active check timer: ", err)
if not check_healthcheck_timer then
self:log(ERR, "Could not check active checking: ", err)
end
end

Expand Down
2 changes: 1 addition & 1 deletion t/01-start-stop.t
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ checking
})
local ok, err = checker:stop()
ngx.say(ok)
ngx.sleep(0.2) -- wait twice the interval
ngx.sleep(1) -- active healthchecks might take up to 1s to start
local ok, err = checker:start()
ngx.say(ok)
ngx.sleep(0.2) -- wait twice the interval
Expand Down
11 changes: 4 additions & 7 deletions t/02-add_target.t
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use Cwd qw(cwd);

workers(1);

plan tests => repeat_each() * (blocks() * 4) + 6;
plan tests => repeat_each() * (blocks() * 4) + 3;

my $pwd = cwd();

Expand Down Expand Up @@ -39,7 +39,7 @@ __DATA__
}
}
})
ngx.sleep(0.2) -- wait twice the interval
ngx.sleep(1) -- active healthchecks might take up to 1s to start
local ok, err = checker:add_target("127.0.0.1", 11111, nil, false)
ngx.say(ok)
ngx.sleep(0.2) -- wait twice the interval
Expand All @@ -51,7 +51,6 @@ GET /t
true
--- error_log
checking healthy targets: nothing to do
checking unhealthy targets: nothing to do
checking unhealthy targets: #1

--- no_error_log
Expand Down Expand Up @@ -92,7 +91,7 @@ qq{
}
}
})
ngx.sleep(0.2) -- wait twice the interval
ngx.sleep(1) -- active healthchecks might take up to 1s to start
local ok, err = checker:add_target("127.0.0.1", 2112, nil, true)
ngx.say(ok)
ngx.sleep(0.2) -- wait twice the interval
Expand All @@ -103,7 +102,6 @@ GET /t
--- response_body
true
--- error_log
checking healthy targets: nothing to do
checking unhealthy targets: nothing to do
checking healthy targets: #1

Expand Down Expand Up @@ -148,7 +146,7 @@ qq{
}
}
})
ngx.sleep(0.2) -- wait twice the interval
ngx.sleep(1) -- active healthchecks might take up to 1s to start
local ok, err = checker:add_target("127.0.0.1", 2113, nil, true)
local ok, err = checker:add_target("127.0.0.1", 2113, nil, false)
ngx.say(ok)
Expand All @@ -160,7 +158,6 @@ GET /t
--- response_body
true
--- error_log
checking healthy targets: nothing to do
checking unhealthy targets: nothing to do
checking healthy targets: #1

Expand Down
24 changes: 16 additions & 8 deletions t/09-active_probes.t
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,9 @@ qq{
},
}
})
ngx.sleep(1) -- active healthchecks might take up to 1s to start
local ok, err = checker:add_target("127.0.0.1", 2114, nil, true)
ngx.sleep(0.5) -- wait for 5x the check interval
ngx.sleep(0.6) -- wait for 6x the check interval
ngx.say(checker:get_target_status("127.0.0.1", 2114)) -- false
}
}
Expand Down Expand Up @@ -111,7 +112,8 @@ qq{
}
})
local ok, err = checker:add_target("127.0.0.1", 2114, nil, false)
ngx.sleep(0.5) -- wait for 5x the check interval
ngx.sleep(1) -- active healthchecks might take up to 1s to start
ngx.sleep(0.6) -- wait for 6x the check interval
ngx.say(checker:get_target_status("127.0.0.1", 2114)) -- true
}
}
Expand Down Expand Up @@ -164,8 +166,9 @@ qq{
},
}
})
ngx.sleep(1) -- active healthchecks might take up to 1s to start
local ok, err = checker:add_target("127.0.0.1", 2114, nil, true)
ngx.sleep(0.5) -- wait for 5x the check interval
ngx.sleep(0.6) -- wait for 6x the check interval
ngx.say(checker:get_target_status("127.0.0.1", 2114)) -- true
}
}
Expand Down Expand Up @@ -220,8 +223,9 @@ qq{
},
}
})
ngx.sleep(1) -- active healthchecks might take up to 1s to start
local ok, err = checker:add_target("127.0.0.1", 2114, nil, true)
ngx.sleep(0.5) -- wait for 5x the check interval
ngx.sleep(0.6) -- wait for 6x the check interval
ngx.say(checker:get_target_status("127.0.0.1", 2114)) -- false
}
}
Expand Down Expand Up @@ -281,8 +285,9 @@ qq{
},
}
})
ngx.sleep(1) -- active healthchecks might take up to 1s to start
local ok, err = checker:add_target("127.0.0.1", 2114, "example.com", false)
ngx.sleep(0.3) -- wait for 3x the check interval
ngx.sleep(0.2) -- wait for 2x the check interval
ngx.say(checker:get_target_status("127.0.0.1", 2114, "example.com")) -- true
}
}
Expand All @@ -292,7 +297,7 @@ GET /t
true
--- error_log
event: target status 'example.com(127.0.0.1:2114)' from 'false' to 'true'
checking unhealthy targets: nothing to do
checking unhealthy targets: #1


=== TEST 6: active probes, tcp node failing
Expand Down Expand Up @@ -323,9 +328,10 @@ qq{
},
}
})
ngx.sleep(1) -- active healthchecks might take up to 1s to start
-- Note: no http server configured, so port 2114 remains unanswered
local ok, err = checker:add_target("127.0.0.1", 2114, nil, true)
ngx.sleep(0.5) -- wait for 5x the check interval
ngx.sleep(0.6) -- wait for 6x the check interval
ngx.say(checker:get_target_status("127.0.0.1", 2114)) -- false
}
}
Expand Down Expand Up @@ -379,8 +385,9 @@ qq{
},
}
})
ngx.sleep(1) -- active healthchecks might take up to 1s to start
local ok, err = checker:add_target("127.0.0.1", 2114, nil, false)
ngx.sleep(0.5) -- wait for 5x the check interval
ngx.sleep(0.6) -- wait for 6x the check interval
ngx.say(checker:get_target_status("127.0.0.1", 2114)) -- true
}
}
Expand Down Expand Up @@ -440,6 +447,7 @@ qq{
},
}
})
ngx.sleep(1) -- active healthchecks might take up to 1s to start
local ok, err = checker:add_target("127.0.0.1", 2114, "example.com", false, "custom-host.test")
ngx.sleep(0.3) -- wait for 3x the check interval
ngx.say(checker:get_target_status("127.0.0.1", 2114, "example.com")) -- true
Expand Down
2 changes: 2 additions & 0 deletions t/11-clear.t
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ initial target list (11 targets)
}
local checker1 = healthcheck.new(config)
local checker2 = healthcheck.new(config)
ngx.sleep(1) -- active healthchecks might take up to 1s to start
for i = 1, 10 do
checker1:add_target("127.0.0.1", 20000 + i, nil, false)
end
Expand Down Expand Up @@ -150,6 +151,7 @@ qq{
}
}
local checker1 = healthcheck.new(config)
ngx.sleep(1) -- active healthchecks might take up to 1s to start
checker1:add_target("127.0.0.1", 21120, nil, true)
ngx.sleep(0.5) -- wait 2.5x the interval
checker1:clear()
Expand Down
3 changes: 2 additions & 1 deletion t/15-get_virtualhost_target_status.t
Original file line number Diff line number Diff line change
Expand Up @@ -195,9 +195,10 @@ qq{
},
}
})
ngx.sleep(1) -- active healthchecks might take up to 1s to start
local ok, err = checker:add_target("127.0.0.1", 2117, "healthyserver", true)
local ok, err = checker:add_target("127.0.0.1", 2117, "unhealthyserver", true)
ngx.sleep(0.5) -- wait for 5x the check interval
ngx.sleep(0.6) -- wait for 6x the check interval
ngx.say(checker:get_target_status("127.0.0.1", 2117, "healthyserver")) -- true
ngx.say(checker:get_target_status("127.0.0.1", 2117, "unhealthyserver")) -- false
local _, err = checker:get_target_status("127.0.0.1", 2117)
Expand Down

0 comments on commit 45fc129

Please sign in to comment.