diff --git a/apisix/consumer.lua b/apisix/consumer.lua index 31dfa05c80f3..35f01329d82b 100644 --- a/apisix/consumer.lua +++ b/apisix/consumer.lua @@ -74,6 +74,15 @@ function _M.plugin(plugin_name) end +function _M.consumers() + if not consumers then + return nil, nil + end + + return consumers.values, consumers.conf_version +end + + function _M.init_worker() local err consumers, err = core.config.new("/consumers", { diff --git a/apisix/core/config_etcd.lua b/apisix/core/config_etcd.lua index fbb6df8dbf5d..6e616b08f906 100644 --- a/apisix/core/config_etcd.lua +++ b/apisix/core/config_etcd.lua @@ -49,6 +49,26 @@ local mt = { end } + +local function getkey(etcd_cli, key) + if not etcd_cli then + return nil, "not inited" + end + + local res, err = etcd_cli:get(key) + if not res then + -- log.error("failed to get key from etcd: ", err) + return nil, err + end + + if type(res.body) ~= "table" then + return nil, "failed to get key from etcd" + end + + return res +end + + local function readdir(etcd_cli, key) if not etcd_cli then return nil, nil, "not inited" @@ -67,12 +87,12 @@ local function readdir(etcd_cli, key) return res end -local function waitdir(etcd_cli, key, modified_index) +local function waitdir(etcd_cli, key, modified_index, timeout) if not etcd_cli then return nil, nil, "not inited" end - local res, err = etcd_cli:waitdir(key, modified_index) + local res, err = etcd_cli:waitdir(key, modified_index, timeout) if not res then -- log.error("failed to get key from etcd: ", err) return nil, err @@ -201,9 +221,25 @@ local function sync_data(self) return true end - local dir_res, err = waitdir(self.etcd_cli, self.key, self.prev_index + 1) + -- for fetch the etcd index + local key_res, _ = getkey(self.etcd_cli, self.key) + + local dir_res, err = waitdir(self.etcd_cli, self.key, self.prev_index + 1, self.timeout) + log.info("waitdir key: ", self.key, " prev_index: ", self.prev_index + 1) log.info("res: ", json.delay_encode(dir_res, true)) + if err == "timeout" then + if key_res and key_res.headers then + local key_index = key_res.headers["X-Etcd-Index"] + local key_idx = key_index and tonumber(key_index) or 0 + if key_idx and key_idx > self.prev_index then + -- Avoid the index to exceed 1000 by updating other keys + -- that will causing a full reload + self:upgrade_version(key_index) + end + end + end + if not dir_res then return false, err end @@ -330,6 +366,15 @@ function _M.get(self, key) end +function _M.getkey(self, key) + if not self.running then + return nil, "stoped" + end + + return getkey(self.etcd_cli, key) +end + + local function _automatic_fetch(premature, self) if premature then return @@ -395,6 +440,7 @@ function _M.new(key, opts) local automatic = opts and opts.automatic local item_schema = opts and opts.item_schema local filter_fun = opts and opts.filter + local timeout = opts and opts.timeout local obj = setmetatable({ etcd_cli = etcd_cli, @@ -410,6 +456,7 @@ function _M.new(key, opts) prev_index = nil, last_err = nil, last_err_time = nil, + timeout = timeout, filter = filter_fun, }, mt) diff --git a/apisix/http/router/radixtree_sni.lua b/apisix/http/router/radixtree_sni.lua index d042aeedc957..07c689fa8033 100644 --- a/apisix/http/router/radixtree_sni.lua +++ b/apisix/http/router/radixtree_sni.lua @@ -204,6 +204,15 @@ function _M.match_and_set(api_ctx) end +function _M.ssls() + if not ssl_certificates then + return nil, nil + end + + return ssl_certificates.values, ssl_certificates.conf_version +end + + function _M.init_worker() local err ssl_certificates, err = core.config.new("/ssl", { diff --git a/apisix/plugins/grpc-transcode/proto.lua b/apisix/plugins/grpc-transcode/proto.lua index 13f3060b1cd8..09240fa5a5b3 100644 --- a/apisix/plugins/grpc-transcode/proto.lua +++ b/apisix/plugins/grpc-transcode/proto.lua @@ -63,6 +63,15 @@ function _M.fetch(proto_id) end +function _M.protos() + if not protos then + return nil, nil + end + + return protos.values, protos.conf_version +end + + function _M.init() local err protos, err = core.config.new("/proto", { diff --git a/apisix/plugins/prometheus/exporter.lua b/apisix/plugins/prometheus/exporter.lua index 2c0b6fc618ab..7e3d63eae7da 100644 --- a/apisix/plugins/prometheus/exporter.lua +++ b/apisix/plugins/prometheus/exporter.lua @@ -22,7 +22,19 @@ local ngx_capture = ngx.location.capture local re_gmatch = ngx.re.gmatch local tonumber = tonumber local select = select +local type = type local prometheus +local router = require("apisix.router") +local get_routes = router.http_routes +local get_ssls = router.ssls +local get_services = require("apisix.http.service").services +local get_consumers = require("apisix.consumer").consumers +local get_upstreams = require("apisix.upstream").upstreams +local clear_tab = core.table.clear +local get_stream_routes = router.stream_routes +local get_protos = require("apisix.plugins.grpc-transcode.proto").protos + + -- Default set of latency buckets, 1ms to 60s: local DEFAULT_BUCKETS = { 1, 2, 5, 7, 10, 15, 20, 25, 30, 40, 50, 60, 70, @@ -34,7 +46,6 @@ local metrics = {} local inner_tab_arr = {} - local clear_tab = core.table.clear local function gen_arr(...) clear_tab(inner_tab_arr) @@ -56,7 +67,7 @@ function _M.init() return end - core.table.clear(metrics) + clear_tab(metrics) -- across all services prometheus = base_prometheus.init("prometheus-metrics", "apisix_") @@ -67,6 +78,10 @@ function _M.init() metrics.etcd_reachable = prometheus:gauge("etcd_reachable", "Config server etcd reachable from APISIX, 0 is unreachable") + metrics.etcd_modify_indexes = prometheus:gauge("etcd_modify_indexes", + "Etcd modify index for APISIX keys", + {"key"}) + -- per service metrics.status = prometheus:counter("http_status", "HTTP status codes per service in APISIX", @@ -152,13 +167,90 @@ local function nginx_status() label_values[1] = name metrics.connections:set(val[0], label_values) + end end +local key_values = {} +local function set_modify_index(key, items, items_ver, global_max_index) + clear_tab(key_values) + local max_idx = 0 + if items_ver and items then + for _, item in ipairs(items) do + if type(item) == "table" and item.modifiedIndex > max_idx then + max_idx = item.modifiedIndex + end + end + end + + key_values[1] = key + metrics.etcd_modify_indexes:set(max_idx, key_values) + + + global_max_index = max_idx > global_max_index and max_idx or global_max_index + + return global_max_index +end + + +local function etcd_modify_index() + clear_tab(key_values) + local global_max_idx = 0 + + -- routes + local routes, routes_ver = get_routes() + global_max_idx = set_modify_index("routes", routes, routes_ver, global_max_idx) + + -- services + local services, services_ver = get_services() + global_max_idx = set_modify_index("services", services, services_ver, global_max_idx) + + -- ssls + local ssls, ssls_ver = get_ssls() + global_max_idx = set_modify_index("ssls", ssls, ssls_ver, global_max_idx) + + -- consumers + local consumers, consumers_ver = get_consumers() + global_max_idx = set_modify_index("consumers", consumers, consumers_ver, global_max_idx) + + -- global_rules + local global_rules = router.global_rules + if global_rules then + global_max_idx = set_modify_index("global_rules", global_rules.values, + global_rules.conf_version, global_max_idx) + + -- prev_index + key_values[1] = "prev_index" + metrics.etcd_modify_indexes:set(global_rules.prev_index, key_values) + + else + global_max_idx = set_modify_index("global_rules", nil, nil, global_max_idx) + end + + -- upstreams + local upstreams, upstreams_ver = get_upstreams() + global_max_idx = set_modify_index("upstreams", upstreams, upstreams_ver, global_max_idx) + + -- stream_routes + local stream_routes, stream_routes_ver = get_stream_routes() + global_max_idx = set_modify_index("stream_routes", stream_routes, + stream_routes_ver, global_max_idx) + + -- proto + local protos, protos_ver = get_protos() + global_max_idx = set_modify_index("protos", protos, protos_ver, global_max_idx) + + -- global max + key_values[1] = "max_modify_index" + metrics.etcd_modify_indexes:set(global_max_idx, key_values) + +end + + function _M.collect() if not prometheus or not metrics then - core.log.err("prometheus: plugin is not initialized, please make sure ", + core.log.error("prometheus: plugin is not initialized, please make sure ", " 'prometheus_metrics' shared dict is present in nginx template") return 500, {message = "An unexpected error occurred"} end @@ -166,6 +258,9 @@ function _M.collect() -- across all services nginx_status() + -- etcd modify index + etcd_modify_index() + -- config server status local config = core.config.new() local version, err = config:server_version() @@ -178,6 +273,14 @@ function _M.collect() "processing metrics endpoint: ", err) end + local res, _ = config:getkey("/routes") + if res and res.headers then + clear_tab(key_values) + -- global max + key_values[1] = "x_etcd_index" + metrics.etcd_modify_indexes:set(res.headers["X-Etcd-Index"], key_values) + end + core.response.set_header("content_type", "text/plain") return 200, core.table.concat(prometheus:metric_data()) end diff --git a/apisix/router.lua b/apisix/router.lua index 4ba870993717..b94be0c42f42 100644 --- a/apisix/router.lua +++ b/apisix/router.lua @@ -103,10 +103,22 @@ function _M.stream_init_worker() end +function _M.ssls() + return _M.router_ssl.ssls() +end + function _M.http_routes() return _M.router_http.routes() end +function _M.stream_routes() + -- maybe it's not inited. + if not _M.router_stream then + return nil, nil + end + return _M.router_stream.routes() +end + -- for test _M.filter_test = filter diff --git a/apisix/schema_def.lua b/apisix/schema_def.lua index d580be6982ed..34957ad8ae39 100644 --- a/apisix/schema_def.lua +++ b/apisix/schema_def.lua @@ -463,6 +463,7 @@ _M.service = { _M.consumer = { type = "object", properties = { + id = id_schema, username = { type = "string", minLength = 1, maxLength = 32, pattern = [[^[a-zA-Z0-9_]+$]] diff --git a/t/core/etcd-sync.t b/t/core/etcd-sync.t new file mode 100644 index 000000000000..ef0ea57cdf5b --- /dev/null +++ b/t/core/etcd-sync.t @@ -0,0 +1,96 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +no_root_location(); + +run_tests; + +__DATA__ + +=== TEST 1: auto update prev_index when other keys update +--- config + location /t { + content_by_lua_block { + local core = require("apisix.core") + + local consumers, _ = core.config.new("/consumers", { + automatic = true, + item_schema = core.schema.consumer, + timeout = 0.2 + }) + + ngx.sleep(0.6) + + local idx = consumers.prev_index + local key = "/test_key" + local val = "test_value" + core.etcd.set(key, val) + + ngx.sleep(2) + + local new_idx = consumers.prev_index + + if new_idx > idx then + ngx.say("prev_index updated") + end + } + } +--- request +GET /t +--- response_body +prev_index updated +--- no_error_log +[error] + + + +=== TEST 2: using default timeout +--- config + location /t { + content_by_lua_block { + local core = require("apisix.core") + + local consumers, _ = core.config.new("/consumers", { + automatic = true, + item_schema = core.schema.consumer + }) + + ngx.sleep(0.6) + local idx = consumers.prev_index + + local key = "/test_key" + local val = "test_value" + core.etcd.set(key, val) + + ngx.sleep(2) + + local new_idx = consumers.prev_index + + if new_idx > idx then + ngx.say("prev_index updated") + else + ngx.say("prev_index not update") + end + } + } +--- request +GET /t +--- response_body +prev_index not update +--- no_error_log +[error] diff --git a/t/plugin/prometheus.t b/t/plugin/prometheus.t index 4e99b84d4a95..3a7d5c7584e1 100644 --- a/t/plugin/prometheus.t +++ b/t/plugin/prometheus.t @@ -663,3 +663,113 @@ GET /t passed --- no_error_log [error] + + + +=== TEST 32: fetch the prometheus metric data with `modify_indexes consumers` +--- request +GET /apisix/prometheus/metrics +--- response_body_like eval +qr/apisix_etcd_modify_indexes\{key="consumers"\} \d+/ +--- no_error_log +[error] + + + +=== TEST 33: fetch the prometheus metric data with `modify_indexes global_rules` +--- request +GET /apisix/prometheus/metrics +--- response_body_like eval +qr/apisix_etcd_modify_indexes\{key="global_rules"\} \d+/ +--- no_error_log +[error] + + + +=== TEST 34: fetch the prometheus metric data with `modify_indexes max_modify_index` +--- request +GET /apisix/prometheus/metrics +--- response_body_like eval +qr/apisix_etcd_modify_indexes\{key="max_modify_index"\} \d+/ +--- no_error_log +[error] + + + +=== TEST 35: fetch the prometheus metric data with `modify_indexes protos` +--- request +GET /apisix/prometheus/metrics +--- response_body_like eval +qr/apisix_etcd_modify_indexes\{key="protos"\} \d+/ +--- no_error_log +[error] + + + +=== TEST 36: fetch the prometheus metric data with `modify_indexes routes` +--- request +GET /apisix/prometheus/metrics +--- response_body_like eval +qr/apisix_etcd_modify_indexes\{key="routes"\} \d+/ +--- no_error_log +[error] + + + +=== TEST 37: fetch the prometheus metric data with `modify_indexes services` +--- request +GET /apisix/prometheus/metrics +--- response_body_like eval +qr/apisix_etcd_modify_indexes\{key="services"\} \d+/ +--- no_error_log +[error] + + + +=== TEST 38: fetch the prometheus metric data with `modify_indexes ssls` +--- request +GET /apisix/prometheus/metrics +--- response_body_like eval +qr/apisix_etcd_modify_indexes\{key="ssls"\} \d+/ +--- no_error_log +[error] + + + +=== TEST 39: fetch the prometheus metric data with `modify_indexes stream_routes` +--- request +GET /apisix/prometheus/metrics +--- response_body_like eval +qr/apisix_etcd_modify_indexes\{key="stream_routes"\} \d+/ +--- no_error_log +[error] + + + +=== TEST 40: fetch the prometheus metric data with `modify_indexes upstreams` +--- request +GET /apisix/prometheus/metrics +--- response_body_like eval +qr/apisix_etcd_modify_indexes\{key="upstreams"\} \d+/ +--- no_error_log +[error] + + + +=== TEST 41: fetch the prometheus metric data with `modify_indexes prev_index` +--- request +GET /apisix/prometheus/metrics +--- response_body_like eval +qr/apisix_etcd_modify_indexes\{key="prev_index"\} \d+/ +--- no_error_log +[error] + + + +=== TEST 42: fetch the prometheus metric data with `modify_indexes x_etcd_index` +--- request +GET /apisix/prometheus/metrics +--- response_body_like eval +qr/apisix_etcd_modify_indexes\{key="x_etcd_index"\} \d+/ +--- no_error_log +[error]