-
Notifications
You must be signed in to change notification settings - Fork 51
/
Copy pathhealthcheck.lua
1456 lines (1205 loc) · 49.6 KB
/
healthcheck.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
--------------------------------------------------------------------------
-- Healthcheck library for OpenResty.
--
-- Some notes on the usage of this library:
--
-- - Each target will have 4 counters, 1 success counter and 3 failure
-- counters ('http', 'tcp', and 'timeout'). Any failure will _only_ reset the
-- success counter, but a success will reset _all three_ failure counters.
--
-- - All targets are uniquely identified by their IP address and port number
-- combination, most functions take those as arguments.
--
-- - All keys in the SHM will be namespaced by the healthchecker name as
-- provided to the `new` function. Hence no collissions will occur on shm-keys
-- as long as the `name` is unique.
--
-- - Active healthchecks will be synchronized across workers, such that only
-- a single active healthcheck runs.
--
-- - Events will be raised in every worker, see [lua-resty-worker-events](https://github.com/Kong/lua-resty-worker-events)
-- for details.
--
-- @copyright 2017-2020 Kong Inc.
-- @author Hisham Muhammad, Thijs Schreijer
-- @license Apache 2.0
local ERR = ngx.ERR
local WARN = ngx.WARN
local DEBUG = ngx.DEBUG
local ngx_log = ngx.log
local tostring = tostring
local ipairs = ipairs
local cjson = require("cjson.safe").new()
local table_remove = table.remove
local utils = require("resty.healthcheck.utils")
local worker_events = require("resty.worker.events")
local resty_lock = require ("resty.lock")
local re_find = ngx.re.find
local bit = require("bit")
local ngx_now = ngx.now
local ssl = require("ngx.ssl")
-- constants
local EVENT_SOURCE_PREFIX = "lua-resty-healthcheck"
local LOG_PREFIX = "[healthcheck] "
local SHM_PREFIX = "lua-resty-healthcheck:"
local EMPTY = setmetatable({},{
__newindex = function()
error("the EMPTY table is read only, check your code!", 2)
end
})
-- Counters: a 32-bit shm integer can hold up to four 8-bit counters.
local CTR_SUCCESS = 0x00000001
local CTR_HTTP = 0x00000100
local CTR_TCP = 0x00010000
local CTR_TIMEOUT = 0x01000000
local MASK_FAILURE = 0xffffff00
local MASK_SUCCESS = 0x000000ff
local COUNTER_NAMES = {
[CTR_SUCCESS] = "SUCCESS",
[CTR_HTTP] = "HTTP",
[CTR_TCP] = "TCP",
[CTR_TIMEOUT] = "TIMEOUT",
}
--- The list of potential events generated.
-- The `checker.EVENT_SOURCE` field can be used to subscribe to the events, see the
-- example below. Each of the events will get a table passed containing
-- the target details `ip`, `port`, and `hostname`.
-- See [lua-resty-worker-events](https://github.com/Kong/lua-resty-worker-events).
-- @field remove Event raised when a target is removed from the checker.
-- @field healthy This event is raised when the target status changed to
-- healthy (and when a target is added as `healthy`).
-- @field unhealthy This event is raised when the target status changed to
-- unhealthy (and when a target is added as `unhealthy`).
-- @field mostly_healthy This event is raised when the target status is
-- still healthy but it started to receive "unhealthy" updates via active or
-- passive checks.
-- @field mostly_unhealthy This event is raised when the target status is
-- still unhealthy but it started to receive "healthy" updates via active or
-- passive checks.
-- @table checker.events
-- @usage -- Register for all events from `my_checker`
-- local event_callback = function(target, event, source, source_PID)
-- local t = target.ip .. ":" .. target.port .." by name '" ..
-- target.hostname .. "' ")
--
-- if event == my_checker.events.remove then
-- print(t .. "has been removed")
-- elseif event == my_checker.events.healthy then
-- print(t .. "is now healthy")
-- elseif event == my_checker.events.unhealthy then
-- print(t .. "is now unhealthy")
-- end
-- end
--
-- worker_events.register(event_callback, my_checker.EVENT_SOURCE)
local EVENTS = setmetatable({}, {
__index = function(self, key)
error(("'%s' is not a valid event name"):format(tostring(key)))
end
})
for _, event in ipairs({
"remove",
"healthy",
"unhealthy",
"mostly_healthy",
"mostly_unhealthy",
"clear",
}) do
EVENTS[event] = event
end
local INTERNAL_STATES = {}
for i, key in ipairs({
"healthy",
"unhealthy",
"mostly_healthy",
"mostly_unhealthy",
}) do
INTERNAL_STATES[i] = key
INTERNAL_STATES[key] = i
end
-- Some color for demo purposes
local use_color = false
local id = function(x) return x end
local worker_color = use_color and function(str) return ("\027["..tostring(31 + ngx.worker.pid() % 5).."m"..str.."\027[0m") end or id
-- Debug function
local function dump(...) print(require("pl.pretty").write({...})) end -- luacheck: ignore 211
local _M = {}
-- TODO: improve serialization speed
-- serialize a table to a string
local function serialize(t)
return cjson.encode(t)
end
-- deserialize a string to a table
local function deserialize(s)
return cjson.decode(s)
end
local function key_for(key_prefix, ip, port, hostname)
return string.format("%s:%s:%s%s", key_prefix, ip, port, hostname and ":" .. hostname or "")
end
local checker = {}
------------------------------------------------------------------------------
-- Node management.
-- @section node-management
------------------------------------------------------------------------------
-- @return the target list from the shm, an empty table if not found, or
-- `nil + error` upon a failure
local function fetch_target_list(self)
local target_list, err = self.shm:get(self.TARGET_LIST)
if err then
return nil, "failed to fetch target_list from shm: " .. err
end
return target_list and deserialize(target_list) or {}
end
--- Helper function to run the function holding a lock on the target list.
-- @see locking_target_list
local function run_fn_locked_target_list(premature, self, fn)
if premature then
return
end
local lock, lock_err = resty_lock:new(self.shm_name, {
exptime = 10, -- timeout after which lock is released anyway
timeout = 5, -- max wait time to acquire lock
})
if not lock then
return nil, "failed to create lock:" .. lock_err
end
local pok, perr = pcall(resty_lock.lock, lock, self.TARGET_LIST_LOCK)
if not pok then
self:log(DEBUG, "failed to acquire lock: ", perr)
return nil, "failed to acquire lock"
end
local target_list, err = fetch_target_list(self)
local final_ok, final_err
if target_list then
final_ok, final_err = pcall(fn, target_list)
else
final_ok, final_err = nil, err
end
local ok
ok, err = lock:unlock()
if not ok then
-- recoverable: not returning this error, only logging it
self:log(ERR, "failed to release lock '", self.TARGET_LIST_LOCK,
"': ", err)
end
return final_ok, final_err
end
--- Run the given function holding a lock on the target list.
-- @param self The checker object
-- @param fn The function to execute
-- @return The results of the function; or nil and an error message
-- in case it fails locking.
local function locking_target_list(self, fn)
local ok, err = run_fn_locked_target_list(false, self, fn)
if err == "failed to acquire lock" then
local _, terr = ngx.timer.at(0, run_fn_locked_target_list, self, fn)
if terr ~= nil then
return nil, terr
end
return true
end
return ok, err
end
--- Get a target
local function get_target(self, ip, port, hostname)
hostname = hostname or ip
return ((self.targets[ip] or EMPTY)[port] or EMPTY)[hostname]
end
--- Add a target to the healthchecker.
-- When the ip + port + hostname combination already exists, it will simply
-- return success (without updating `is_healthy` status).
-- @param ip IP address of the target to check.
-- @param port the port to check against.
-- @param hostname (optional) hostname to set as the host header in the HTTP
-- probe request
-- @param is_healthy (optional) a boolean value indicating the initial state,
-- default is `true`.
-- @param hostheader (optional) a value to use for the Host header on
-- active healthchecks.
-- @return `true` on success, or `nil + error` on failure.
function checker:add_target(ip, port, hostname, is_healthy, hostheader)
ip = tostring(assert(ip, "no ip address provided"))
port = assert(tonumber(port), "no port number provided")
if is_healthy == nil then
is_healthy = true
end
local internal_health = is_healthy and "healthy" or "unhealthy"
local ok, err = locking_target_list(self, function(target_list)
-- check whether we already have this target
for _, target in ipairs(target_list) do
if target.ip == ip and target.port == port and target.hostname == hostname then
self:log(DEBUG, "adding an existing target: ", hostname or "", " ", ip,
":", port, " (ignoring)")
return false
end
end
-- we first add the internal health, and only then the updated list.
-- this prevents a state where a target is in the list, but does not
-- have a key in the shm.
local ok, err = self.shm:set(key_for(self.TARGET_STATE, ip, port, hostname),
INTERNAL_STATES[internal_health])
if not ok then
self:log(ERR, "failed to set initial health status in shm: ", err)
end
-- target does not exist, go add it
target_list[#target_list + 1] = {
ip = ip,
port = port,
hostname = hostname,
hostheader = hostheader,
}
target_list = serialize(target_list)
ok, err = self.shm:set(self.TARGET_LIST, target_list)
if not ok then
return nil, "failed to store target_list in shm: " .. err
end
-- raise event for our newly added target
self:raise_event(self.events[internal_health], ip, port, hostname)
return true
end)
if ok == false then
-- the target already existed, no event, but still success
return true
end
return ok, err
end
-- Remove health status entries from an individual target from shm
-- @param self The checker object
-- @param ip IP address of the target being checked.
-- @param port the port being checked against.
-- @param hostname hostname of the target being checked.
local function clear_target_data_from_shm(self, ip, port, hostname)
local ok, err = self.shm:set(key_for(self.TARGET_STATE, ip, port, hostname), nil)
if not ok then
self:log(ERR, "failed to remove health status from shm: ", err)
end
ok, err = self.shm:set(key_for(self.TARGET_COUNTER, ip, port, hostname), nil)
if not ok then
self:log(ERR, "failed to clear health counter from shm: ", err)
end
end
--- Remove a target from the healthchecker.
-- The target not existing is not considered an error.
-- @param ip IP address of the target being checked.
-- @param port the port being checked against.
-- @param hostname (optional) hostname of the target being checked.
-- @return `true` on success, or `nil + error` on failure.
function checker:remove_target(ip, port, hostname)
ip = tostring(assert(ip, "no ip address provided"))
port = assert(tonumber(port), "no port number provided")
return locking_target_list(self, function(target_list)
-- find the target
local target_found
for i, target in ipairs(target_list) do
if target.ip == ip and target.port == port and target.hostname == hostname then
target_found = target
table_remove(target_list, i)
break
end
end
if not target_found then
return true
end
-- go update the shm
target_list = serialize(target_list)
-- we first write the updated list, and only then remove the health
-- status; this prevents race conditions when a healthchecker gets the
-- initial state from the shm
local ok, err = self.shm:set(self.TARGET_LIST, target_list)
if not ok then
return nil, "failed to store target_list in shm: " .. err
end
clear_target_data_from_shm(self, ip, port, hostname)
-- raise event for our removed target
self:raise_event(self.events.remove, ip, port, hostname)
return true
end)
end
--- Clear all healthcheck data.
-- @return `true` on success, or `nil + error` on failure.
function checker:clear()
return locking_target_list(self, function(target_list)
local old_target_list = target_list
-- go update the shm
target_list = serialize({})
local ok, err = self.shm:set(self.TARGET_LIST, target_list)
if not ok then
return nil, "failed to store target_list in shm: " .. err
end
-- remove all individual statuses
for _, target in ipairs(old_target_list) do
local ip, port, hostname = target.ip, target.port, target.hostname
clear_target_data_from_shm(self, ip, port, hostname)
end
self.targets = {}
-- raise event for our removed target
self:raise_event(self.events.clear)
return true
end)
end
--- Get the current status of the target.
-- @param ip IP address of the target being checked.
-- @param port the port being checked against.
-- @param hostname the hostname of the target being checked.
-- @return `true` if healthy, `false` if unhealthy, or `nil + error` on failure.
function checker:get_target_status(ip, port, hostname)
local target = get_target(self, ip, port, hostname)
if not target then
return nil, "target not found"
end
return target.internal_health == "healthy"
or target.internal_health == "mostly_healthy"
end
------------------------------------------------------------------------------
-- Health management.
-- Functions that allow reporting of failures/successes for passive checks.
-- @section health-management
------------------------------------------------------------------------------
--- Helper function to actually run the function holding a lock on the target.
-- @see locking_target
local function run_mutexed_fn(premature, self, ip, port, hostname, fn)
if premature then
return
end
local lock, lock_err = resty_lock:new(self.shm_name, {
exptime = 10, -- timeout after which lock is released anyway
timeout = 5, -- max wait time to acquire lock
})
if not lock then
return nil, "failed to create lock:" .. lock_err
end
local lock_key = key_for(self.TARGET_LOCK, ip, port, hostname)
local pok, perr = pcall(resty_lock.lock, lock, lock_key)
if not pok then
self:log(DEBUG, "failed to acquire lock: ", perr)
return nil, "failed to acquire lock"
end
local final_ok, final_err = pcall(fn)
local ok, err = lock:unlock()
if not ok then
-- recoverable: not returning this error, only logging it
self:log(ERR, "failed to release lock '", lock_key, "': ", err)
end
return final_ok, final_err
end
-- Run the given function holding a lock on the target.
-- @param self The checker object
-- @param ip Target IP
-- @param port Target port
-- @param hostname Target hostname
-- @param fn The function to execute
-- @return The results of the function; or true in case it fails locking and
-- will retry asynchronously; or nil+err in case it fails to retry.
local function locking_target(self, ip, port, hostname, fn)
local ok, err = run_mutexed_fn(false, self, ip, port, hostname, fn)
if err == "failed to acquire lock" then
local _, terr = ngx.timer.at(0, run_mutexed_fn, self, ip, port, hostname, fn)
if terr ~= nil then
return nil, terr
end
return true
end
return ok, err
end
-- Extract the value of the counter at `idx` from multi-counter `multictr`.
-- @param multictr A 32-bit multi-counter holding 4 values.
-- @param idx The shift index specifying which counter to get.
-- @return The 8-bit value extracted from the 32-bit multi-counter.
local function ctr_get(multictr, idx)
return bit.band(multictr / idx, 0xff)
end
-- Increment the healthy or unhealthy counter. If the threshold of occurrences
-- is reached, it changes the status of the target in the shm and posts an
-- event.
-- @param self The checker object
-- @param health_report "healthy" for the success counter that drives a target
-- towards the healthy state; "unhealthy" for the failure counter.
-- @param ip Target IP
-- @param port Target port
-- @param hostname Target hostname
-- @param limit the limit after which target status is changed
-- @param ctr_type the counter to increment, see CTR_xxx constants
-- @return True if succeeded, or nil and an error message.
local function incr_counter(self, health_report, ip, port, hostname, limit, ctr_type)
-- fail fast on counters that are disabled by configuration
if limit == 0 then
return true
end
port = tonumber(port)
local target = get_target(self, ip, port, hostname)
if not target then
-- sync issue: warn, but return success
self:log(WARN, "trying to increment a target that is not in the list: ",
hostname and "(" .. hostname .. ") " or "", ip, ":", port)
return true
end
local current_health = target.internal_health
if health_report == current_health then
-- No need to count successes when internal health is fully "healthy"
-- or failures when internal health is fully "unhealthy"
return true
end
return locking_target(self, ip, port, hostname, function()
local counter_key = key_for(self.TARGET_COUNTER, ip, port, hostname)
local multictr, err = self.shm:incr(counter_key, ctr_type, 0)
if err then
return nil, err
end
local ctr = ctr_get(multictr, ctr_type)
self:log(WARN, health_report, " ", COUNTER_NAMES[ctr_type],
" increment (", ctr, "/", limit, ") for '", hostname or "",
"(", ip, ":", port, ")'")
local new_multictr
if ctr_type == CTR_SUCCESS then
new_multictr = bit.band(multictr, MASK_SUCCESS)
else
new_multictr = bit.band(multictr, MASK_FAILURE)
end
if new_multictr ~= multictr then
self.shm:set(counter_key, new_multictr)
end
local new_health
if ctr >= limit then
new_health = health_report
elseif current_health == "healthy" and bit.band(new_multictr, MASK_FAILURE) > 0 then
new_health = "mostly_healthy"
elseif current_health == "unhealthy" and bit.band(new_multictr, MASK_SUCCESS) > 0 then
new_health = "mostly_unhealthy"
end
if new_health and new_health ~= current_health then
local state_key = key_for(self.TARGET_STATE, ip, port, hostname)
self.shm:set(state_key, INTERNAL_STATES[new_health])
self:raise_event(self.events[new_health], ip, port, hostname)
end
return true
end)
end
--- Report a health failure.
-- Reports a health failure which will count against the number of occurrences
-- required to make a target "fall". The type of healthchecker,
-- "tcp" or "http" (see `new`) determines against which counter the occurence goes.
-- If `unhealthy.tcp_failures` (for TCP failures) or `unhealthy.http_failures`
-- is set to zero in the configuration, this function is a no-op
-- and returns `true`.
-- @param ip IP address of the target being checked.
-- @param port the port being checked against.
-- @param hostname (optional) hostname of the target being checked.
-- @param check (optional) the type of check, either "passive" or "active", default "passive".
-- @return `true` on success, or `nil + error` on failure.
function checker:report_failure(ip, port, hostname, check)
local checks = self.checks[check or "passive"]
local limit, ctr_type
if self.checks[check or "passive"].type == "tcp" then
limit = checks.unhealthy.tcp_failures
ctr_type = CTR_TCP
else
limit = checks.unhealthy.http_failures
ctr_type = CTR_HTTP
end
return incr_counter(self, "unhealthy", ip, port, hostname, limit, ctr_type)
end
--- Report a health success.
-- Reports a health success which will count against the number of occurrences
-- required to make a target "rise".
-- If `healthy.successes` is set to zero in the configuration,
-- this function is a no-op and returns `true`.
-- @param ip IP address of the target being checked.
-- @param port the port being checked against.
-- @param hostname (optional) hostname of the target being checked.
-- @param check (optional) the type of check, either "passive" or "active", default "passive".
-- @return `true` on success, or `nil + error` on failure.
function checker:report_success(ip, port, hostname, check)
local limit = self.checks[check or "passive"].healthy.successes
return incr_counter(self, "healthy", ip, port, hostname, limit, CTR_SUCCESS)
end
--- Report a http response code.
-- How the code is interpreted is based on the configuration for healthy and
-- unhealthy statuses. If it is in neither strategy, it will be ignored.
-- If `healthy.successes` (for healthy HTTP status codes)
-- or `unhealthy.http_failures` (fur unhealthy HTTP status codes)
-- is set to zero in the configuration, this function is a no-op
-- and returns `true`.
-- @param ip IP address of the target being checked.
-- @param port the port being checked against.
-- @param hostname (optional) hostname of the target being checked.
-- @param http_status the http statuscode, or nil to report an invalid http response.
-- @param check (optional) the type of check, either "passive" or "active", default "passive".
-- @return `true` on success, `nil` if the status was ignored (not in active or
-- passive health check lists) or `nil + error` on failure.
function checker:report_http_status(ip, port, hostname, http_status, check)
http_status = tonumber(http_status) or 0
local checks = self.checks[check or "passive"]
local status_type, limit, ctr
if checks.healthy.http_statuses[http_status] then
status_type = "healthy"
limit = checks.healthy.successes
ctr = CTR_SUCCESS
elseif checks.unhealthy.http_statuses[http_status]
or http_status == 0 then
status_type = "unhealthy"
limit = checks.unhealthy.http_failures
ctr = CTR_HTTP
else
return
end
return incr_counter(self, status_type, ip, port, hostname, limit, ctr)
end
--- Report a failure on TCP level.
-- If `unhealthy.tcp_failures` is set to zero in the configuration,
-- this function is a no-op and returns `true`.
-- @param ip IP address of the target being checked.
-- @param port the port being checked against.
-- @param hostname hostname of the target being checked.
-- @param operation The socket operation that failed:
-- "connect", "send" or "receive".
-- TODO check what kind of information we get from the OpenResty layer
-- in order to tell these error conditions apart
-- https://github.com/openresty/lua-resty-core/blob/master/lib/ngx/balancer.md#get_last_failure
-- @param check (optional) the type of check, either "passive" or "active", default "passive".
-- @return `true` on success, or `nil + error` on failure.
function checker:report_tcp_failure(ip, port, hostname, operation, check)
local limit = self.checks[check or "passive"].unhealthy.tcp_failures
-- TODO what do we do with the `operation` information
return incr_counter(self, "unhealthy", ip, port, hostname, limit, CTR_TCP)
end
--- Report a timeout failure.
-- If `unhealthy.timeouts` is set to zero in the configuration,
-- this function is a no-op and returns `true`.
-- @param ip IP address of the target being checked.
-- @param port the port being checked against.
-- @param hostname (optional) hostname of the target being checked.
-- @param check (optional) the type of check, either "passive" or "active", default "passive".
-- @return `true` on success, or `nil + error` on failure.
function checker:report_timeout(ip, port, hostname, check)
local limit = self.checks[check or "passive"].unhealthy.timeouts
return incr_counter(self, "unhealthy", ip, port, hostname, limit, CTR_TIMEOUT)
end
--- Sets the current status of all targets with the given hostname and port.
-- @param hostname hostname being checked.
-- @param port the port being checked against
-- @param is_healthy boolean: `true` for healthy, `false` for unhealthy
-- @return `true` on success, or `nil + error` on failure.
function checker:set_all_target_statuses_for_hostname(hostname, port, is_healthy)
assert(type(hostname) == "string", "no hostname provided")
port = assert(tonumber(port), "no port number provided")
assert(type(is_healthy) == "boolean")
local all_ok = true
local errs = {}
for _, target in ipairs(self.targets) do
if target.port == port and target.hostname == hostname then
local ok, err = self:set_target_status(target.ip, port, hostname, is_healthy)
if not ok then
all_ok = nil
table.insert(errs, err)
end
end
end
return all_ok, #errs > 0 and table.concat(errs, "; ") or nil
end
--- Sets the current status of the target.
-- This will immediately set the status and clear its counters.
-- @param ip IP address of the target being checked
-- @param port the port being checked against
-- @param hostname (optional) hostname of the target being checked.
-- @param is_healthy boolean: `true` for healthy, `false` for unhealthy
-- @return `true` on success, or `nil + error` on failure
function checker:set_target_status(ip, port, hostname, is_healthy)
ip = tostring(assert(ip, "no ip address provided"))
port = assert(tonumber(port), "no port number provided")
assert(type(is_healthy) == "boolean")
local health_report = is_healthy and "healthy" or "unhealthy"
local target = get_target(self, ip, port, hostname)
if not target then
-- sync issue: warn, but return success
self:log(WARN, "trying to set status for a target that is not in the list: ", ip, ":", port)
return true
end
local counter_key = key_for(self.TARGET_COUNTER, ip, port, hostname)
local state_key = key_for(self.TARGET_STATE, ip, port, hostname)
local ok, err = locking_target(self, ip, port, hostname, function()
local _, err = self.shm:set(counter_key, 0)
if err then
return nil, err
end
self.shm:set(state_key, INTERNAL_STATES[health_report])
if err then
return nil, err
end
self:raise_event(self.events[health_report], ip, port, hostname)
return true
end)
if ok then
self:log(WARN, health_report, " forced for ", hostname, " ", ip, ":", port)
end
return ok, err
end
-- Introspection function for testing
local function test_get_counter(self, ip, port, hostname)
return locking_target(self, ip, port, hostname, function()
local counter = self.shm:get(key_for(self.TARGET_COUNTER, ip, port, hostname))
local internal_health = (get_target(self, ip, port, hostname) or EMPTY).internal_health
return counter, internal_health
end)
end
--============================================================================
-- Healthcheck runner
--============================================================================
-- Runs a single healthcheck probe
function checker:run_single_check(ip, port, hostname, hostheader)
local sock, err = ngx.socket.tcp()
if not sock then
self:log(ERR, "failed to create stream socket: ", err)
return
end
sock:settimeout(self.checks.active.timeout * 1000)
local ok
ok, err = sock:connect(ip, port)
if not ok then
if err == "timeout" then
sock:close() -- timeout errors do not close the socket.
return self:report_timeout(ip, port, hostname, "active")
end
return self:report_tcp_failure(ip, port, hostname, "connect", "active")
end
if self.checks.active.type == "tcp" then
sock:close()
return self:report_success(ip, port, hostname, "active")
end
if self.checks.active.type == "https" then
local https_sni, session, err
https_sni = self.checks.active.https_sni or hostheader or hostname
if self.ssl_cert and self.ssl_key then
session, err = sock:tlshandshake({
verify = self.checks.active.https_verify_certificate,
client_cert = self.ssl_cert,
client_priv_key = self.ssl_key,
server_name = https_sni
})
else
session, err = sock:sslhandshake(nil, https_sni,
self.checks.active.https_verify_certificate)
end
if not session then
sock:close()
self:log(ERR, "failed SSL handshake with '", hostname or "", " (", ip, ":", port, ")', using server name (sni) '", https_sni, "': ", err)
return self:report_tcp_failure(ip, port, hostname, "connect", "active")
end
end
local path = self.checks.active.http_path
local request = ("GET %s HTTP/1.0\r\nHost: %s\r\n\r\n"):format(path, hostheader or hostname)
local bytes
bytes, err = sock:send(request)
if not bytes then
self:log(ERR, "failed to send http request to '", hostname or "", " (", ip, ":", port, ")': ", err)
if err == "timeout" then
sock:close() -- timeout errors do not close the socket.
return self:report_timeout(ip, port, hostname, "active")
end
return self:report_tcp_failure(ip, port, hostname, "send", "active")
end
local status_line
status_line, err = sock:receive()
if not status_line then
self:log(ERR, "failed to receive status line from '", hostname or "", " (",ip, ":", port, ")': ", err)
if err == "timeout" then
sock:close() -- timeout errors do not close the socket.
return self:report_timeout(ip, port, hostname, "active")
end
return self:report_tcp_failure(ip, port, hostname, "receive", "active")
end
local from, to = re_find(status_line,
[[^HTTP/\d+\.\d+\s+(\d+)]],
"joi", nil, 1)
local status
if from then
status = tonumber(status_line:sub(from, to))
else
self:log(ERR, "bad status line from '", hostname or "", " (", ip, ":", port, ")': ", status_line)
-- note: 'status' will be reported as 'nil'
end
sock:close()
self:log(DEBUG, "Reporting '", hostname or "", " (", ip, ":", port, ")' (got HTTP ", status, ")")
return self:report_http_status(ip, port, hostname, status, "active")
end
-- executes a work package (a list of checks) sequentially
function checker:run_work_package(work_package)
for _, work_item in ipairs(work_package) do
self:log(DEBUG, "Checking ", work_item.hostname or "", " ",
work_item.hostheader and "(host header: ".. work_item.hostheader .. ")"
or "", work_item.ip, ":", work_item.port,
" (currently ", work_item.debug_health, ")")
local hostheader = work_item.hostheader or work_item.hostname
self:run_single_check(work_item.ip, work_item.port, work_item.hostname, hostheader)
end
end
-- runs the active healthchecks concurrently, in multiple work packages.
-- @param list the list of targets to check
function checker:active_check_targets(list)
local idx = 1
local work_packages = {}
for _, work_item in ipairs(list) do
local package = work_packages[idx]
if not package then
package = {}
work_packages[idx] = package
end
package[#package + 1] = work_item
idx = idx + 1
if idx > self.checks.active.concurrency then idx = 1 end
end
-- hand out work-packages to the threads, note the "-1" because this timer
-- thread will handle the last package itself.
local threads = {}
for i = 1, #work_packages - 1 do
threads[i] = ngx.thread.spawn(self.run_work_package, self, work_packages[i])
end
-- run last package myself
self:run_work_package(work_packages[#work_packages])
-- wait for everybody to finish
for _, thread in ipairs(threads) do
ngx.thread.wait(thread)
end
end
--============================================================================
-- Internal callbacks, timers and events
--============================================================================
-- The timer callbacks are responsible for checking the status, upon success/
-- failure they will call the health-management functions to deal with the
-- results of the checks.
-- @param health_mode either "healthy" or "unhealthy" to indicate what
-- lock to get.
-- @return `true` on success, or false if the lock was not acquired, or `nil + error`
-- in case of errors
function checker:get_periodic_lock(health_mode)
local key = self.PERIODIC_LOCK .. health_mode
local interval = self.checks.active[health_mode].interval
-- The lock is held for the whole interval to prevent multiple
-- worker processes from sending the test request simultaneously.
-- UNLESS: the probing takes longer than the timer interval.
-- Here we substract the lock expiration time by 1ms to prevent
-- a race condition with the next timer event.
local ok, err = self.shm:add(key, true, interval - 0.001)
if not ok then
if err == "exists" then
return false
end
self:log(ERR, "failed to add key '", key, "': ", err)
return nil
end
return true
end
--- Active health check callback function.
-- @param premature default openresty param
-- @param self the checker object this timer runs on
-- @param health_mode either "healthy" or "unhealthy" to indicate what check
local function checker_callback(premature, self, health_mode)
if premature or self.stopping then
self.timer_count = self.timer_count - 1
return
end
local interval
if not self:get_periodic_lock(health_mode) then
-- another worker just ran, or is running the healthcheck
interval = self.checks.active[health_mode].interval
else
-- we're elected to run the active healthchecks
-- create a list of targets to check, here we can still do this atomically
local start_time = ngx_now()
local list_to_check = {}
local targets = fetch_target_list(self)
for _, target in ipairs(targets) do
local tgt = get_target(self, target.ip, target.port, target.hostname)
local internal_health = tgt and tgt.internal_health or nil
if (health_mode == "healthy" and (internal_health == "healthy" or
internal_health == "mostly_healthy"))
or (health_mode == "unhealthy" and (internal_health == "unhealthy" or
internal_health == "mostly_unhealthy"))