From 231d839ac86d85557eed4edf2644d865f7acfa8a Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 30 Oct 2023 21:18:03 +0800 Subject: [PATCH 01/12] refine lac avg speed Signed-off-by: guo-shaoge --- .../LocalAdmissionController.cpp | 14 ++++- .../LocalAdmissionController.h | 58 +++++++++++++------ .../src/Flash/ResourceControl/TokenBucket.cpp | 29 ---------- dbms/src/Flash/ResourceControl/TokenBucket.h | 16 +---- 4 files changed, 52 insertions(+), 65 deletions(-) diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp index 686dae867e1..f5fe5fccc26 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp @@ -167,15 +167,20 @@ std::optional LocalAdmissionControll const ResourceGroupPtr & resource_group, bool is_periodically_fetch) { - double token_consumption = resource_group->getAndCleanConsumptionDelta(); + double token_consumption = 0.0; double acquire_tokens = 0.0; + const auto now = std::chrono::steady_clock::now(); + + const auto consumption_update_info + = resource_group->updateConsumptionSpeedInfoIfNecessary(now, DEFAULT_FETCH_GAC_INTERVAL); + if (consumption_update_info.updated) + token_consumption = consumption_update_info.delta; auto get_acquire_tokens = [&]() { if (resource_group->burstable) return; // To avoid periodically_token_fetch after low_token_fetch immediately - const auto now = std::chrono::steady_clock::now(); if (is_periodically_fetch && !resource_group->needFetchTokenPeridically(now, DEFAULT_FETCH_GAC_INTERVAL)) return; @@ -183,7 +188,10 @@ std::optional LocalAdmissionControll if (resource_group->inTrickleModeLease(now)) return; - acquire_tokens = resource_group->getAcquireRUNum(DEFAULT_FETCH_GAC_INTERVAL.count(), ACQUIRE_RU_AMPLIFICATION); + acquire_tokens = resource_group->getAcquireRUNum( + consumption_update_info.speed, + DEFAULT_FETCH_GAC_INTERVAL.count(), + ACQUIRE_RU_AMPLIFICATION); assert(acquire_tokens >= 0.0); }; diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h index d1363fd8171..43b55b405af 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h @@ -175,23 +175,21 @@ class ResourceGroup final : private boost::noncopyable } // Return how many tokens should acquire from GAC for the next n seconds. - double getAcquireRUNum(uint32_t n, double amplification) const + double getAcquireRUNum(double speed, uint32_t n_sec, double amplification) const { assert(amplification >= 1.0); - double avg_speed = 0.0; double remaining_ru = 0.0; double base = 0.0; { std::lock_guard lock(mu); - avg_speed = bucket->getAvgSpeedPerSec(); remaining_ru = bucket->peek(); base = static_cast(user_ru_per_sec); } // Appropriate amplification is necessary to prevent situation that GAC has sufficient RU, // but user query speed is limited due to LAC requests too few RU. - double acquire_num = avg_speed * n * amplification; + double acquire_num = speed * n_sec * amplification; // Prevent avg_speed from being 0 due to RU exhaustion. if (acquire_num == 0.0 && remaining_ru <= 0.0) @@ -205,7 +203,7 @@ class ResourceGroup final : private boost::noncopyable "acquire num for rg {}: avg_speed: {}, remaining_ru: {}, base: {}, amplification: {}, " "acquire num: {}", name, - avg_speed, + speed, remaining_ru, base, amplification, @@ -296,13 +294,37 @@ class ResourceGroup final : private boost::noncopyable bucket->toString()); } - double getAndCleanConsumptionDelta() + struct ConsumptionUpdateInfo + { + // Avg speed of RU consumption of time range [last_update_ru_consumption_timepoint, now]. + double speed; + // RU consumption since last_update_ru_consumption_timepoint. + double delta; + // If speed or delta is updated or not. + bool updated; + }; + + ConsumptionUpdateInfo updateConsumptionSpeedInfoIfNecessary( + const std::chrono::steady_clock::time_point & now, + const std::chrono::seconds & dura) { std::lock_guard lock(mu); - auto ori = ru_consumption_delta; - ru_consumption_delta = 0.0; - total_consumption += ori; - return ori; + const auto elapsed + = std::chrono::duration_cast(now - last_update_ru_consumption_timepoint); + + if (elapsed < dura) + return {.speed = ru_consumption_speed, .delta = ru_consumption_delta, .updated = false}; + + ru_consumption_speed = ru_consumption_delta / elapsed.count(); + ConsumptionUpdateInfo info = {ru_consumption_speed, ru_consumption_delta, true}; + + last_update_ru_consumption_timepoint = now; + total_ru_consumption += ru_consumption_delta; + ru_consumption_delta = 0; + + GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_avg_speed, name).Set(ru_consumption_speed); + GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_total_consumption, name).Set(total_ru_consumption); + return info; } bool needFetchTokenPeridically(const std::chrono::steady_clock::time_point & now, const std::chrono::seconds & dura) @@ -328,11 +350,10 @@ class ResourceGroup final : private boost::noncopyable void collectMetrics() const { + // todo maybe put it where metric change std::lock_guard lock(mu); const auto & config = bucket->getConfig(); GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_remaining_tokens, name).Set(config.tokens); - GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_avg_speed, name).Set(bucket->getAvgSpeedPerSec()); - GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_total_consumption, name).Set(total_consumption); GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_bucket_fill_rate, name).Set(config.fill_rate); GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_bucket_capacity, name).Set(config.capacity); GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_fetch_tokens_from_gac_count, name) @@ -358,15 +379,16 @@ class ResourceGroup final : private boost::noncopyable TokenBucketMode bucket_mode = TokenBucketMode::normal_mode; - double ru_consumption_delta = 0.0; - LoggerPtr log; - std::chrono::time_point last_fetch_tokens_from_gac_timepoint - = std::chrono::steady_clock::now(); - std::chrono::time_point stop_trickle_timepoint = std::chrono::steady_clock::now(); + std::chrono::steady_clock::time_point last_fetch_tokens_from_gac_timepoint = std::chrono::steady_clock::now(); + std::chrono::steady_clock::time_point stop_trickle_timepoint = std::chrono::steady_clock::now(); uint64_t fetch_tokens_from_gac_count = 0; - double total_consumption = 0.0; + double total_ru_consumption = 0.0; + + double ru_consumption_delta = 0.0; + double ru_consumption_speed = 0.0; + std::chrono::steady_clock::time_point last_update_ru_consumption_timepoint = std::chrono::steady_clock::now(); }; using ResourceGroupPtr = std::shared_ptr; diff --git a/dbms/src/Flash/ResourceControl/TokenBucket.cpp b/dbms/src/Flash/ResourceControl/TokenBucket.cpp index 6a255db8b43..684ebb5270c 100644 --- a/dbms/src/Flash/ResourceControl/TokenBucket.cpp +++ b/dbms/src/Flash/ResourceControl/TokenBucket.cpp @@ -53,35 +53,6 @@ void TokenBucket::reConfig(const TokenBucketConfig & config) capacity = config.capacity; compact(now); - // Update because token number may increase, which may cause token_changed be negative. - last_get_avg_speed_tokens = tokens; - last_get_avg_speed_timepoint = std::chrono::steady_clock::now(); -} - -double TokenBucket::getAvgSpeedPerSec() -{ - auto now = std::chrono::steady_clock::now(); - RUNTIME_CHECK(now >= last_get_avg_speed_timepoint); - auto dura = std::chrono::duration_cast(now - last_get_avg_speed_timepoint); - - compact(now); - double token_changed = last_get_avg_speed_tokens - tokens; - - // If dura less than 1 sec, return last sec avg speed. - if (dura.count() >= 1) - { - avg_speed_per_sec = token_changed / dura.count(); - last_get_avg_speed_tokens = tokens; - last_get_avg_speed_timepoint = now; - } - LOG_TRACE( - log, - "getAvgSpeedPerSec dura: {}, last_get_avg_speed_tokens: {}, cur tokens: {}, avg_speed_per_sec: {}", - dura.count(), - last_get_avg_speed_tokens, - tokens, - avg_speed_per_sec); - return avg_speed_per_sec; } void TokenBucket::compact(const TokenBucket::TimePoint & timepoint) diff --git a/dbms/src/Flash/ResourceControl/TokenBucket.h b/dbms/src/Flash/ResourceControl/TokenBucket.h index a363a095024..3aafbdcc32a 100644 --- a/dbms/src/Flash/ResourceControl/TokenBucket.h +++ b/dbms/src/Flash/ResourceControl/TokenBucket.h @@ -43,9 +43,6 @@ class TokenBucket final , tokens(init_tokens_) , capacity(capacity_) , last_compact_timepoint(std::chrono::steady_clock::now()) - , last_get_avg_speed_timepoint(std::chrono::steady_clock::now()) - , last_get_avg_speed_tokens(init_tokens_) - , avg_speed_per_sec(0.0) , low_token_threshold(LOW_TOKEN_THRESHOLD_RATE * capacity_) , log(Logger::get(log_id)) {} @@ -83,8 +80,6 @@ class TokenBucket final return {tokens, fill_rate, capacity}; } - double getAvgSpeedPerSec(); - bool lowToken() const { return peek() <= low_token_threshold; } bool isStatic() const { return fill_rate == 0.0; } @@ -92,12 +87,7 @@ class TokenBucket final std::string toString() const { FmtBuffer fmt_buf; - fmt_buf.fmtAppend( - "tokens: {}, fill_rate: {}, capacity: {}, avg_speed_per_sec: {}", - tokens, - fill_rate, - capacity, - avg_speed_per_sec); + fmt_buf.fmtAppend("tokens: {}, fill_rate: {}, capacity: {}", tokens, fill_rate, capacity); return fmt_buf.toString(); } @@ -116,10 +106,6 @@ class TokenBucket final TimePoint last_compact_timepoint; - TimePoint last_get_avg_speed_timepoint; - double last_get_avg_speed_tokens; - double avg_speed_per_sec; - double low_token_threshold; LoggerPtr log; From 443810d38cb7205623ec691f9fa999b3bf483afb Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 30 Oct 2023 21:33:45 +0800 Subject: [PATCH 02/12] del getAvgSpeed() call Signed-off-by: guo-shaoge --- dbms/src/Flash/ResourceControl/LocalAdmissionController.h | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h index 43b55b405af..b0481cbfc15 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h @@ -280,11 +280,10 @@ class ResourceGroup final : private boost::noncopyable return; bucket_mode = TokenBucketMode::degrade_mode; - double avg_speed = bucket->getAvgSpeedPerSec(); auto config = bucket->getConfig(); std::string ori_bucket_info = bucket->toString(); - config.fill_rate = avg_speed; + config.fill_rate = ru_consumption_speed; bucket->reConfig(config); LOG_INFO( log, From 9c95d04dd7bbe58f25adce0f8ac86caddcf1d6cf Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Mon, 30 Oct 2023 22:07:16 +0800 Subject: [PATCH 03/12] lac trickle stop Signed-off-by: guo-shaoge --- .../Flash/ResourceControl/LocalAdmissionController.cpp | 7 +++++++ .../Flash/ResourceControl/LocalAdmissionController.h | 10 ++++++++-- 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp index f5fe5fccc26..763d8b39f6e 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp @@ -192,6 +192,13 @@ std::optional LocalAdmissionControll consumption_update_info.speed, DEFAULT_FETCH_GAC_INTERVAL.count(), ACQUIRE_RU_AMPLIFICATION); + + if (acquire_tokens == 0.0 && resource_group->needNotifyStopTrickleMode(now)) + { + LOG_DEBUG(log, "force notify stop trickle mode"); + acquire_tokens = 1.0; + } + assert(acquire_tokens >= 0.0); }; diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h index b0481cbfc15..723d287f113 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h @@ -347,6 +347,12 @@ class ResourceGroup final : private boost::noncopyable return bucket_mode == trickle_mode && tp < stop_trickle_timepoint; } + bool needNotifyStopTrickleMode(const std::chrono::steady_clock::time_point & tp) + { + std::lock_guard lock(mu); + return bucket_mode == trickle_mode && tp >= stop_trickle_timepoint; + } + void collectMetrics() const { // todo maybe put it where metric change @@ -430,7 +436,7 @@ class LocalAdmissionController final : private boost::noncopyable } group->consumeResource(ru, cpu_time_in_ns); - if (group->lowToken()) + if (group->lowToken() || group->needNotifyStopTrickleMode(std::chrono::steady_clock::now())) { { std::lock_guard lock(mu); @@ -597,7 +603,7 @@ class LocalAdmissionController final : private boost::noncopyable // Utilities for fetch token from GAC. void fetchTokensForLowTokenResourceGroups(); void fetchTokensForAllResourceGroups(); - static std::optional buildAcquireInfo( + std::optional buildAcquireInfo( const ResourceGroupPtr & resource_group, bool is_periodically_fetch); From 4e4b3a7d5b9b12fff9795a1e4022750e0d477aba Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Tue, 31 Oct 2023 11:10:35 +0800 Subject: [PATCH 04/12] ACQUIRE_RU_AMPLIFICATION to 1.5 Signed-off-by: guo-shaoge --- dbms/src/Flash/ResourceControl/LocalAdmissionController.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h index 723d287f113..5e707868c97 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h @@ -537,7 +537,7 @@ class LocalAdmissionController final : private boost::noncopyable static constexpr auto DEGRADE_MODE_DURATION = std::chrono::seconds(120); static constexpr auto TARGET_REQUEST_PERIOD_MS = std::chrono::milliseconds(5000); static constexpr auto COLLECT_METRIC_INTERVAL = std::chrono::seconds(5); - static constexpr double ACQUIRE_RU_AMPLIFICATION = 1.1; + static constexpr double ACQUIRE_RU_AMPLIFICATION = 1.5; static const std::string GAC_RESOURCE_GROUP_ETCD_PATH; static const std::string WATCH_GAC_ERR_PREFIX; From adfffa4061a0598057a2392ebf004bc6e9045e78 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 1 Nov 2023 11:58:22 +0800 Subject: [PATCH 05/12] refine log Signed-off-by: guo-shaoge --- .../src/Flash/ResourceControl/LocalAdmissionController.cpp | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp index 763d8b39f6e..3a9e9680ef5 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp @@ -193,9 +193,12 @@ std::optional LocalAdmissionControll DEFAULT_FETCH_GAC_INTERVAL.count(), ACQUIRE_RU_AMPLIFICATION); - if (acquire_tokens == 0.0 && resource_group->needNotifyStopTrickleMode(now)) + if (acquire_tokens == 0.0 && token_consumption == 0.0 && resource_group->needNotifyStopTrickleMode(now)) { - LOG_DEBUG(log, "force notify stop trickle mode"); + // If acquire_tokens and token_consumption are both zero, will ignore send RPC to GAC. + // But we need to make sure trickle mode should exit timely, which needs to talk with GAC. + // So we force acquire 1RU. + LOG_DEBUG(log, "force acquire 1RU because of try to exit trickle mode"); acquire_tokens = 1.0; } From 74c6e524e13869044160d9c4892cc8db3bec033f Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 1 Nov 2023 12:07:05 +0800 Subject: [PATCH 06/12] report last ru consumption Signed-off-by: guo-shaoge --- .../LocalAdmissionController.h | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h index 5e707868c97..83bfd46b0fc 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h @@ -434,6 +434,25 @@ class LocalAdmissionController final : private boost::noncopyable LOG_INFO(log, "cannot consume ru for {}, maybe it has been deleted", name); return; } + LOG_DEBUG(log, "background threads joined"); + + // Report final RU consumption before stop: + // 1. to avoid RU consumption omission. + // 2. clear GAC's unique_client_id to avoid affecting burst limit calculation. + // This can happend when disagg CN is scaled-in/out frequently. + std::vector acquire_infos; + for (const auto & resource_group : resource_groups) + { + const auto consumption_update_info = resource_group.second->updateConsumptionSpeedInfoIfNecessary( + std::chrono::steady_clock::time_point::max(), + std::chrono::seconds(0)); + assert(consumption_update_info.updated); + if (consumption_update_info.delta == 0.0) + continue; + acquire_infos.push_back( + {.resource_group_name = resource_group.first, .ru_consumption_delta = consumption_update_info.delta}); + } + fetchTokensFromGAC(acquire_infos, "before stop"); group->consumeResource(ru, cpu_time_in_ns); if (group->lowToken() || group->needNotifyStopTrickleMode(std::chrono::steady_clock::now())) From 118b67e5415882b20b9eb3849db8f72dbff95b4a Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 1 Nov 2023 12:52:08 +0800 Subject: [PATCH 07/12] refine Signed-off-by: guo-shaoge --- dbms/src/Flash/ResourceControl/LocalAdmissionController.h | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h index 83bfd46b0fc..f28991fa1d7 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h @@ -434,7 +434,6 @@ class LocalAdmissionController final : private boost::noncopyable LOG_INFO(log, "cannot consume ru for {}, maybe it has been deleted", name); return; } - LOG_DEBUG(log, "background threads joined"); // Report final RU consumption before stop: // 1. to avoid RU consumption omission. @@ -450,7 +449,9 @@ class LocalAdmissionController final : private boost::noncopyable if (consumption_update_info.delta == 0.0) continue; acquire_infos.push_back( - {.resource_group_name = resource_group.first, .ru_consumption_delta = consumption_update_info.delta}); + {.resource_group_name = resource_group.first, + .acquire_tokens = 0, + .ru_consumption_delta = consumption_update_info.delta}); } fetchTokensFromGAC(acquire_infos, "before stop"); From 5c9ed625624ae4275076dc9dee39f7ba9736af3c Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Wed, 1 Nov 2023 13:16:28 +0800 Subject: [PATCH 08/12] refine metric with lock Signed-off-by: guo-shaoge --- .../LocalAdmissionController.h | 57 ++++++++++++------- dbms/src/Flash/ResourceControl/TokenBucket.h | 6 ++ 2 files changed, 41 insertions(+), 22 deletions(-) diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h index f28991fa1d7..7555bf85f05 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h @@ -296,33 +296,42 @@ class ResourceGroup final : private boost::noncopyable struct ConsumptionUpdateInfo { // Avg speed of RU consumption of time range [last_update_ru_consumption_timepoint, now]. - double speed; + double speed = 0.0; // RU consumption since last_update_ru_consumption_timepoint. - double delta; + double delta = 0.0; + // Total RU consumption of all time. + double total = 0.0; // If speed or delta is updated or not. - bool updated; + bool updated = false; }; ConsumptionUpdateInfo updateConsumptionSpeedInfoIfNecessary( const std::chrono::steady_clock::time_point & now, const std::chrono::seconds & dura) { - std::lock_guard lock(mu); - const auto elapsed - = std::chrono::duration_cast(now - last_update_ru_consumption_timepoint); + ConsumptionUpdateInfo info; + { + std::lock_guard lock(mu); + const auto elapsed + = std::chrono::duration_cast(now - last_update_ru_consumption_timepoint); - if (elapsed < dura) - return {.speed = ru_consumption_speed, .delta = ru_consumption_delta, .updated = false}; + if (elapsed < dura) + return {.speed = ru_consumption_speed, .delta = ru_consumption_delta, .updated = false}; - ru_consumption_speed = ru_consumption_delta / elapsed.count(); - ConsumptionUpdateInfo info = {ru_consumption_speed, ru_consumption_delta, true}; + ru_consumption_speed = ru_consumption_delta / elapsed.count(); + total_ru_consumption += ru_consumption_delta; - last_update_ru_consumption_timepoint = now; - total_ru_consumption += ru_consumption_delta; - ru_consumption_delta = 0; + info.speed = ru_consumption_speed; + info.total = total_ru_consumption; + info.delta = ru_consumption_delta; + info.updated = true; + + ru_consumption_delta = 0; + last_update_ru_consumption_timepoint = now; + } - GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_avg_speed, name).Set(ru_consumption_speed); - GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_total_consumption, name).Set(total_ru_consumption); + GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_avg_speed, name).Set(info.speed); + GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_total_consumption, name).Set(info.total); return info; } @@ -355,14 +364,18 @@ class ResourceGroup final : private boost::noncopyable void collectMetrics() const { - // todo maybe put it where metric change - std::lock_guard lock(mu); - const auto & config = bucket->getConfig(); - GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_remaining_tokens, name).Set(config.tokens); - GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_bucket_fill_rate, name).Set(config.fill_rate); - GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_bucket_capacity, name).Set(config.capacity); + TokenBucket::TokenBucketConfig local_config; + uint64_t local_fetch_count = 0; + { + std::lock_guard lock(mu); + local_config = bucket->getConfig(); + local_fetch_count = fetch_tokens_from_gac_count; + } + GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_remaining_tokens, name).Set(local_config.tokens); + GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_bucket_fill_rate, name).Set(local_config.fill_rate); + GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_bucket_capacity, name).Set(local_config.capacity); GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_fetch_tokens_from_gac_count, name) - .Set(fetch_tokens_from_gac_count); + .Set(local_fetch_count); } const std::string name; diff --git a/dbms/src/Flash/ResourceControl/TokenBucket.h b/dbms/src/Flash/ResourceControl/TokenBucket.h index 3aafbdcc32a..c5367e25e4d 100644 --- a/dbms/src/Flash/ResourceControl/TokenBucket.h +++ b/dbms/src/Flash/ResourceControl/TokenBucket.h @@ -51,6 +51,12 @@ class TokenBucket final struct TokenBucketConfig { + TokenBucketConfig() + : tokens(0.0) + , fill_rate(0.0) + , capacity(0.0) + {} + TokenBucketConfig(double tokens_, double fill_rate_, double capacity_) : tokens(tokens_) , fill_rate(fill_rate_) From c18b0fa93808c5711d271ba9c304e6dcecd92b83 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Thu, 2 Nov 2023 09:17:35 +0800 Subject: [PATCH 09/12] refine code Signed-off-by: guo-shaoge --- .../LocalAdmissionController.cpp | 14 +++++----- .../LocalAdmissionController.h | 27 +++++++++---------- dbms/src/Flash/ResourceControl/TokenBucket.h | 4 +-- 3 files changed, 21 insertions(+), 24 deletions(-) diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp index 3a9e9680ef5..626caf7c647 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp @@ -77,7 +77,7 @@ void LocalAdmissionController::startBackgroudJob() } LOG_INFO(log, "get unique_client_id succeed: {}", unique_client_id); - auto last_metric_time_point = std::chrono::steady_clock::now(); + auto last_metric_time_point = SteadyClock::now(); while (!stopped.load()) { bool fetch_token_periodically = false; @@ -85,7 +85,7 @@ void LocalAdmissionController::startBackgroudJob() { std::unique_lock lock(mu); - auto now = std::chrono::steady_clock::now(); + auto now = SteadyClock::now(); if (now - last_metric_time_point >= COLLECT_METRIC_INTERVAL) { last_metric_time_point = now; @@ -169,7 +169,7 @@ std::optional LocalAdmissionControll { double token_consumption = 0.0; double acquire_tokens = 0.0; - const auto now = std::chrono::steady_clock::now(); + const auto now = SteadyClock::now(); const auto consumption_update_info = resource_group->updateConsumptionSpeedInfoIfNecessary(now, DEFAULT_FETCH_GAC_INTERVAL); @@ -224,7 +224,7 @@ void LocalAdmissionController::fetchTokensFromGAC( { // In theory last_fetch_tokens_from_gac_timepoint should only be updated when network to GAC is ok, // but we still update here to avoid resource groups that has enough RU goto degrade mode. - last_fetch_tokens_from_gac_timepoint = std::chrono::steady_clock::now(); + last_fetch_tokens_from_gac_timepoint = SteadyClock::now(); return; } @@ -292,7 +292,7 @@ void LocalAdmissionController::fetchTokensFromGAC( void LocalAdmissionController::checkDegradeMode() { - auto now = std::chrono::steady_clock::now(); + auto now = SteadyClock::now(); std::lock_guard lock(mu); if ((now - last_fetch_tokens_from_gac_timepoint) >= DEGRADE_MODE_DURATION) { @@ -316,7 +316,8 @@ std::vector LocalAdmissionController::handleTokenBucketsResp( std::vector handled_resource_group_names; handled_resource_group_names.reserve(resp.responses_size()); // Network to GAC is ok, update timepoint. - last_fetch_tokens_from_gac_timepoint = std::chrono::steady_clock::now(); + const auto now = SteadyClock::now(); + last_fetch_tokens_from_gac_timepoint = now; if (resp.responses().empty()) { @@ -324,7 +325,6 @@ std::vector LocalAdmissionController::handleTokenBucketsResp( return {}; } - const auto now = std::chrono::steady_clock::now(); for (const resource_manager::TokenBucketResponse & one_resp : resp.responses()) { // For each resource group. diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h index 7555bf85f05..0a2eb17e7a7 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h @@ -35,6 +35,7 @@ namespace DB { class LocalAdmissionController; +using SteadyClock = std::chrono::steady_clock; // gac_resp.burst_limit < 0: resource group is burstable, and will not use bucket at all. // gac_resp.burst_limit >= 0: resource group is not burstable, will use bucket to limit the speed of the resource group. @@ -263,7 +264,7 @@ class ResourceGroup final : private boost::noncopyable std::string ori_bucket_info = bucket->toString(); bucket->reConfig(TokenBucket::TokenBucketConfig(bucket->peek(), new_fill_rate, new_capacity)); - stop_trickle_timepoint = std::chrono::steady_clock::now() + std::chrono::milliseconds(trickle_ms); + stop_trickle_timepoint = SteadyClock::now() + std::chrono::milliseconds(trickle_ms); LOG_DEBUG( log, "token bucket of rg {} reconfig to trickle mode: from: {}, to: {}", @@ -306,7 +307,7 @@ class ResourceGroup final : private boost::noncopyable }; ConsumptionUpdateInfo updateConsumptionSpeedInfoIfNecessary( - const std::chrono::steady_clock::time_point & now, + const SteadyClock::time_point & now, const std::chrono::seconds & dura) { ConsumptionUpdateInfo info; @@ -335,14 +336,13 @@ class ResourceGroup final : private boost::noncopyable return info; } - bool needFetchTokenPeridically(const std::chrono::steady_clock::time_point & now, const std::chrono::seconds & dura) - const + bool needFetchTokenPeridically(const SteadyClock::time_point & now, const std::chrono::seconds & dura) const { std::lock_guard lock(mu); return std::chrono::duration_cast(now - last_fetch_tokens_from_gac_timepoint) > dura; } - void updateFetchTokenTimepoint(const std::chrono::steady_clock::time_point & tp) + void updateFetchTokenTimepoint(const SteadyClock::time_point & tp) { std::lock_guard lock(mu); assert(last_fetch_tokens_from_gac_timepoint <= tp); @@ -350,13 +350,13 @@ class ResourceGroup final : private boost::noncopyable ++fetch_tokens_from_gac_count; } - bool inTrickleModeLease(const std::chrono::steady_clock::time_point & tp) + bool inTrickleModeLease(const SteadyClock::time_point & tp) { std::lock_guard lock(mu); return bucket_mode == trickle_mode && tp < stop_trickle_timepoint; } - bool needNotifyStopTrickleMode(const std::chrono::steady_clock::time_point & tp) + bool needNotifyStopTrickleMode(const SteadyClock::time_point & tp) { std::lock_guard lock(mu); return bucket_mode == trickle_mode && tp >= stop_trickle_timepoint; @@ -399,14 +399,14 @@ class ResourceGroup final : private boost::noncopyable LoggerPtr log; - std::chrono::steady_clock::time_point last_fetch_tokens_from_gac_timepoint = std::chrono::steady_clock::now(); - std::chrono::steady_clock::time_point stop_trickle_timepoint = std::chrono::steady_clock::now(); + SteadyClock::time_point last_fetch_tokens_from_gac_timepoint = SteadyClock::now(); + SteadyClock::time_point stop_trickle_timepoint = SteadyClock::now(); uint64_t fetch_tokens_from_gac_count = 0; double total_ru_consumption = 0.0; double ru_consumption_delta = 0.0; double ru_consumption_speed = 0.0; - std::chrono::steady_clock::time_point last_update_ru_consumption_timepoint = std::chrono::steady_clock::now(); + SteadyClock::time_point last_update_ru_consumption_timepoint = SteadyClock::now(); }; using ResourceGroupPtr = std::shared_ptr; @@ -456,7 +456,7 @@ class LocalAdmissionController final : private boost::noncopyable for (const auto & resource_group : resource_groups) { const auto consumption_update_info = resource_group.second->updateConsumptionSpeedInfoIfNecessary( - std::chrono::steady_clock::time_point::max(), + SteadyClock::time_point::max(), std::chrono::seconds(0)); assert(consumption_update_info.updated); if (consumption_update_info.delta == 0.0) @@ -469,7 +469,7 @@ class LocalAdmissionController final : private boost::noncopyable fetchTokensFromGAC(acquire_infos, "before stop"); group->consumeResource(ru, cpu_time_in_ns); - if (group->lowToken() || group->needNotifyStopTrickleMode(std::chrono::steady_clock::now())) + if (group->lowToken() || group->needNotifyStopTrickleMode(SteadyClock::now())) { { std::lock_guard lock(mu); @@ -659,8 +659,7 @@ class LocalAdmissionController final : private boost::noncopyable std::unordered_set low_token_resource_groups; std::atomic max_ru_per_sec = 0; - std::chrono::time_point last_fetch_tokens_from_gac_timepoint - = std::chrono::steady_clock::now(); + std::chrono::time_point last_fetch_tokens_from_gac_timepoint = SteadyClock::now(); ::pingcap::kv::Cluster * cluster = nullptr; std::atomic need_reset_unique_client_id{false}; diff --git a/dbms/src/Flash/ResourceControl/TokenBucket.h b/dbms/src/Flash/ResourceControl/TokenBucket.h index c5367e25e4d..710df30162c 100644 --- a/dbms/src/Flash/ResourceControl/TokenBucket.h +++ b/dbms/src/Flash/ResourceControl/TokenBucket.h @@ -92,9 +92,7 @@ class TokenBucket final std::string toString() const { - FmtBuffer fmt_buf; - fmt_buf.fmtAppend("tokens: {}, fill_rate: {}, capacity: {}", tokens, fill_rate, capacity); - return fmt_buf.toString(); + return fmt::format("tokens: {}, fill_rate: {}, capacity: {}", tokens, fill_rate, capacity); } private: From 67190e5f0a844bc97dd412b3c8159cbed132988f Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Thu, 2 Nov 2023 09:59:01 +0800 Subject: [PATCH 10/12] expire Signed-off-by: guo-shaoge --- dbms/src/Flash/ResourceControl/LocalAdmissionController.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h index 0a2eb17e7a7..40470e822dd 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h @@ -356,7 +356,7 @@ class ResourceGroup final : private boost::noncopyable return bucket_mode == trickle_mode && tp < stop_trickle_timepoint; } - bool needNotifyStopTrickleMode(const SteadyClock::time_point & tp) + bool trickleModeLeaseExpire(const SteadyClock::time_point & tp) { std::lock_guard lock(mu); return bucket_mode == trickle_mode && tp >= stop_trickle_timepoint; @@ -469,7 +469,7 @@ class LocalAdmissionController final : private boost::noncopyable fetchTokensFromGAC(acquire_infos, "before stop"); group->consumeResource(ru, cpu_time_in_ns); - if (group->lowToken() || group->needNotifyStopTrickleMode(SteadyClock::now())) + if (group->lowToken() || group->trickleModeLeaseExpire(SteadyClock::now())) { { std::lock_guard lock(mu); From 307d0dfa435bcf32c8d75f928fa9daa9ba990b8a Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Thu, 2 Nov 2023 10:27:16 +0800 Subject: [PATCH 11/12] fix Signed-off-by: guo-shaoge --- dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp index 626caf7c647..72830e3ac9f 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp @@ -193,7 +193,7 @@ std::optional LocalAdmissionControll DEFAULT_FETCH_GAC_INTERVAL.count(), ACQUIRE_RU_AMPLIFICATION); - if (acquire_tokens == 0.0 && token_consumption == 0.0 && resource_group->needNotifyStopTrickleMode(now)) + if (acquire_tokens == 0.0 && token_consumption == 0.0 && resource_group->trickleModeLeaseExpire(now)) { // If acquire_tokens and token_consumption are both zero, will ignore send RPC to GAC. // But we need to make sure trickle mode should exit timely, which needs to talk with GAC. From 8d9ef4376acf0fc132b4894140523ad0c598fda9 Mon Sep 17 00:00:00 2001 From: guo-shaoge Date: Thu, 2 Nov 2023 17:08:33 +0800 Subject: [PATCH 12/12] update stop() Signed-off-by: guo-shaoge --- .../LocalAdmissionController.h | 40 +++++++++---------- 1 file changed, 20 insertions(+), 20 deletions(-) diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h index 564b3cb6949..77a2a7ff784 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h @@ -448,26 +448,6 @@ class LocalAdmissionController final : private boost::noncopyable return; } - // Report final RU consumption before stop: - // 1. to avoid RU consumption omission. - // 2. clear GAC's unique_client_id to avoid affecting burst limit calculation. - // This can happend when disagg CN is scaled-in/out frequently. - std::vector acquire_infos; - for (const auto & resource_group : resource_groups) - { - const auto consumption_update_info = resource_group.second->updateConsumptionSpeedInfoIfNecessary( - SteadyClock::time_point::max(), - std::chrono::seconds(0)); - assert(consumption_update_info.updated); - if (consumption_update_info.delta == 0.0) - continue; - acquire_infos.push_back( - {.resource_group_name = resource_group.first, - .acquire_tokens = 0, - .ru_consumption_delta = consumption_update_info.delta}); - } - fetchTokensFromGAC(acquire_infos, "before stop"); - group->consumeResource(ru, cpu_time_in_ns); if (group->lowToken() || group->trickleModeLeaseExpire(SteadyClock::now())) { @@ -546,6 +526,26 @@ class LocalAdmissionController final : private boost::noncopyable thread.join(); } + // Report final RU consumption before stop: + // 1. to avoid RU consumption omission. + // 2. clear GAC's unique_client_id to avoid affecting burst limit calculation. + // This can happend when disagg CN is scaled-in/out frequently. + std::vector acquire_infos; + for (const auto & resource_group : resource_groups) + { + const auto consumption_update_info = resource_group.second->updateConsumptionSpeedInfoIfNecessary( + SteadyClock::time_point::max(), + std::chrono::seconds(0)); + assert(consumption_update_info.updated); + if (consumption_update_info.delta == 0.0) + continue; + acquire_infos.push_back( + {.resource_group_name = resource_group.first, + .acquire_tokens = 0, + .ru_consumption_delta = consumption_update_info.delta}); + } + fetchTokensFromGAC(acquire_infos, "before stop"); + if (need_reset_unique_client_id.load()) { try