Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refine LAC computation of avg_speed #8277

Merged
merged 13 commits into from
Nov 2, 2023
24 changes: 21 additions & 3 deletions dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,23 +167,41 @@ std::optional<LocalAdmissionController::AcquireTokenInfo> LocalAdmissionControll
const ResourceGroupPtr & resource_group,
bool is_periodically_fetch)
{
double token_consumption = resource_group->getAndCleanConsumptionDelta();
double token_consumption = 0.0;
double acquire_tokens = 0.0;
const auto now = std::chrono::steady_clock::now();
SeaRise marked this conversation as resolved.
Show resolved Hide resolved

const auto consumption_update_info
= resource_group->updateConsumptionSpeedInfoIfNecessary(now, DEFAULT_FETCH_GAC_INTERVAL);
if (consumption_update_info.updated)
token_consumption = consumption_update_info.delta;

auto get_acquire_tokens = [&]() {
if (resource_group->burstable)
return;

// To avoid periodically_token_fetch after low_token_fetch immediately
const auto now = std::chrono::steady_clock::now();
if (is_periodically_fetch && !resource_group->needFetchTokenPeridically(now, DEFAULT_FETCH_GAC_INTERVAL))
return;

// During trickle mode, no need to fetch tokens from GAC.
if (resource_group->inTrickleModeLease(now))
return;

acquire_tokens = resource_group->getAcquireRUNum(DEFAULT_FETCH_GAC_INTERVAL.count(), ACQUIRE_RU_AMPLIFICATION);
acquire_tokens = resource_group->getAcquireRUNum(
consumption_update_info.speed,
DEFAULT_FETCH_GAC_INTERVAL.count(),
ACQUIRE_RU_AMPLIFICATION);

if (acquire_tokens == 0.0 && token_consumption == 0.0 && resource_group->needNotifyStopTrickleMode(now))
{
// If acquire_tokens and token_consumption are both zero, will ignore send RPC to GAC.
// But we need to make sure trickle mode should exit timely, which needs to talk with GAC.
// So we force acquire 1RU.
LOG_DEBUG(log, "force acquire 1RU because of try to exit trickle mode");
acquire_tokens = 1.0;
}

assert(acquire_tokens >= 0.0);
};

Expand Down
120 changes: 90 additions & 30 deletions dbms/src/Flash/ResourceControl/LocalAdmissionController.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,23 +175,21 @@ class ResourceGroup final : private boost::noncopyable
}

// Return how many tokens should acquire from GAC for the next n seconds.
double getAcquireRUNum(uint32_t n, double amplification) const
double getAcquireRUNum(double speed, uint32_t n_sec, double amplification) const
{
assert(amplification >= 1.0);

double avg_speed = 0.0;
double remaining_ru = 0.0;
double base = 0.0;
{
std::lock_guard lock(mu);
avg_speed = bucket->getAvgSpeedPerSec();
remaining_ru = bucket->peek();
base = static_cast<double>(user_ru_per_sec);
}

// Appropriate amplification is necessary to prevent situation that GAC has sufficient RU,
// but user query speed is limited due to LAC requests too few RU.
double acquire_num = avg_speed * n * amplification;
double acquire_num = speed * n_sec * amplification;

// Prevent avg_speed from being 0 due to RU exhaustion.
if (acquire_num == 0.0 && remaining_ru <= 0.0)
Expand All @@ -205,7 +203,7 @@ class ResourceGroup final : private boost::noncopyable
"acquire num for rg {}: avg_speed: {}, remaining_ru: {}, base: {}, amplification: {}, "
"acquire num: {}",
name,
avg_speed,
speed,
remaining_ru,
base,
amplification,
Expand Down Expand Up @@ -282,11 +280,10 @@ class ResourceGroup final : private boost::noncopyable
return;

bucket_mode = TokenBucketMode::degrade_mode;
double avg_speed = bucket->getAvgSpeedPerSec();
auto config = bucket->getConfig();
std::string ori_bucket_info = bucket->toString();

config.fill_rate = avg_speed;
config.fill_rate = ru_consumption_speed;
bucket->reConfig(config);
LOG_INFO(
log,
Expand All @@ -296,13 +293,46 @@ class ResourceGroup final : private boost::noncopyable
bucket->toString());
}

double getAndCleanConsumptionDelta()
struct ConsumptionUpdateInfo
{
std::lock_guard lock(mu);
auto ori = ru_consumption_delta;
ru_consumption_delta = 0.0;
total_consumption += ori;
return ori;
// Avg speed of RU consumption of time range [last_update_ru_consumption_timepoint, now].
double speed = 0.0;
// RU consumption since last_update_ru_consumption_timepoint.
double delta = 0.0;
// Total RU consumption of all time.
double total = 0.0;
// If speed or delta is updated or not.
bool updated = false;
};

ConsumptionUpdateInfo updateConsumptionSpeedInfoIfNecessary(
const std::chrono::steady_clock::time_point & now,
const std::chrono::seconds & dura)
{
ConsumptionUpdateInfo info;
{
std::lock_guard lock(mu);
const auto elapsed
= std::chrono::duration_cast<std::chrono::seconds>(now - last_update_ru_consumption_timepoint);

if (elapsed < dura)
return {.speed = ru_consumption_speed, .delta = ru_consumption_delta, .updated = false};

ru_consumption_speed = ru_consumption_delta / elapsed.count();
total_ru_consumption += ru_consumption_delta;

info.speed = ru_consumption_speed;
info.total = total_ru_consumption;
info.delta = ru_consumption_delta;
info.updated = true;

ru_consumption_delta = 0;
last_update_ru_consumption_timepoint = now;
}

GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_avg_speed, name).Set(info.speed);
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_total_consumption, name).Set(info.total);
return info;
}

bool needFetchTokenPeridically(const std::chrono::steady_clock::time_point & now, const std::chrono::seconds & dura)
Expand All @@ -326,17 +356,26 @@ class ResourceGroup final : private boost::noncopyable
return bucket_mode == trickle_mode && tp < stop_trickle_timepoint;
}

void collectMetrics() const
bool needNotifyStopTrickleMode(const std::chrono::steady_clock::time_point & tp)
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
{
std::lock_guard lock(mu);
const auto & config = bucket->getConfig();
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_remaining_tokens, name).Set(config.tokens);
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_avg_speed, name).Set(bucket->getAvgSpeedPerSec());
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_total_consumption, name).Set(total_consumption);
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_bucket_fill_rate, name).Set(config.fill_rate);
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_bucket_capacity, name).Set(config.capacity);
return bucket_mode == trickle_mode && tp >= stop_trickle_timepoint;
}

void collectMetrics() const
{
TokenBucket::TokenBucketConfig local_config;
uint64_t local_fetch_count = 0;
{
std::lock_guard lock(mu);
local_config = bucket->getConfig();
local_fetch_count = fetch_tokens_from_gac_count;
}
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_remaining_tokens, name).Set(local_config.tokens);
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_bucket_fill_rate, name).Set(local_config.fill_rate);
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_bucket_capacity, name).Set(local_config.capacity);
GET_RESOURCE_GROUP_METRIC(tiflash_resource_group, type_fetch_tokens_from_gac_count, name)
.Set(fetch_tokens_from_gac_count);
.Set(local_fetch_count);
}

const std::string name;
Expand All @@ -358,15 +397,16 @@ class ResourceGroup final : private boost::noncopyable

TokenBucketMode bucket_mode = TokenBucketMode::normal_mode;

double ru_consumption_delta = 0.0;

LoggerPtr log;

std::chrono::time_point<std::chrono::steady_clock> last_fetch_tokens_from_gac_timepoint
= std::chrono::steady_clock::now();
std::chrono::time_point<std::chrono::steady_clock> stop_trickle_timepoint = std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point last_fetch_tokens_from_gac_timepoint = std::chrono::steady_clock::now();
std::chrono::steady_clock::time_point stop_trickle_timepoint = std::chrono::steady_clock::now();
uint64_t fetch_tokens_from_gac_count = 0;
double total_consumption = 0.0;
double total_ru_consumption = 0.0;

double ru_consumption_delta = 0.0;
double ru_consumption_speed = 0.0;
std::chrono::steady_clock::time_point last_update_ru_consumption_timepoint = std::chrono::steady_clock::now();
};

using ResourceGroupPtr = std::shared_ptr<ResourceGroup>;
Expand Down Expand Up @@ -408,8 +448,28 @@ class LocalAdmissionController final : private boost::noncopyable
return;
}

// Report final RU consumption before stop:
// 1. to avoid RU consumption omission.
// 2. clear GAC's unique_client_id to avoid affecting burst limit calculation.
// This can happend when disagg CN is scaled-in/out frequently.
std::vector<AcquireTokenInfo> acquire_infos;
for (const auto & resource_group : resource_groups)
{
const auto consumption_update_info = resource_group.second->updateConsumptionSpeedInfoIfNecessary(
std::chrono::steady_clock::time_point::max(),
std::chrono::seconds(0));
assert(consumption_update_info.updated);
if (consumption_update_info.delta == 0.0)
continue;
acquire_infos.push_back(
{.resource_group_name = resource_group.first,
.acquire_tokens = 0,
.ru_consumption_delta = consumption_update_info.delta});
}
fetchTokensFromGAC(acquire_infos, "before stop");

group->consumeResource(ru, cpu_time_in_ns);
if (group->lowToken())
if (group->lowToken() || group->needNotifyStopTrickleMode(std::chrono::steady_clock::now()))
{
{
std::lock_guard lock(mu);
Expand Down Expand Up @@ -510,7 +570,7 @@ class LocalAdmissionController final : private boost::noncopyable
static constexpr auto DEGRADE_MODE_DURATION = std::chrono::seconds(120);
static constexpr auto TARGET_REQUEST_PERIOD_MS = std::chrono::milliseconds(5000);
static constexpr auto COLLECT_METRIC_INTERVAL = std::chrono::seconds(5);
static constexpr double ACQUIRE_RU_AMPLIFICATION = 1.1;
static constexpr double ACQUIRE_RU_AMPLIFICATION = 1.5;

static const std::string GAC_RESOURCE_GROUP_ETCD_PATH;
static const std::string WATCH_GAC_ERR_PREFIX;
Expand Down Expand Up @@ -576,7 +636,7 @@ class LocalAdmissionController final : private boost::noncopyable
// Utilities for fetch token from GAC.
void fetchTokensForLowTokenResourceGroups();
void fetchTokensForAllResourceGroups();
static std::optional<AcquireTokenInfo> buildAcquireInfo(
std::optional<AcquireTokenInfo> buildAcquireInfo(
const ResourceGroupPtr & resource_group,
bool is_periodically_fetch);

Expand Down
29 changes: 0 additions & 29 deletions dbms/src/Flash/ResourceControl/TokenBucket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,35 +53,6 @@ void TokenBucket::reConfig(const TokenBucketConfig & config)
capacity = config.capacity;

compact(now);
// Update because token number may increase, which may cause token_changed be negative.
last_get_avg_speed_tokens = tokens;
last_get_avg_speed_timepoint = std::chrono::steady_clock::now();
}

double TokenBucket::getAvgSpeedPerSec()
{
auto now = std::chrono::steady_clock::now();
RUNTIME_CHECK(now >= last_get_avg_speed_timepoint);
auto dura = std::chrono::duration_cast<std::chrono::seconds>(now - last_get_avg_speed_timepoint);

compact(now);
double token_changed = last_get_avg_speed_tokens - tokens;

// If dura less than 1 sec, return last sec avg speed.
if (dura.count() >= 1)
{
avg_speed_per_sec = token_changed / dura.count();
last_get_avg_speed_tokens = tokens;
last_get_avg_speed_timepoint = now;
}
LOG_TRACE(
log,
"getAvgSpeedPerSec dura: {}, last_get_avg_speed_tokens: {}, cur tokens: {}, avg_speed_per_sec: {}",
dura.count(),
last_get_avg_speed_tokens,
tokens,
avg_speed_per_sec);
return avg_speed_per_sec;
}

void TokenBucket::compact(const TokenBucket::TimePoint & timepoint)
Expand Down
22 changes: 7 additions & 15 deletions dbms/src/Flash/ResourceControl/TokenBucket.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,6 @@ class TokenBucket final
, tokens(init_tokens_)
, capacity(capacity_)
, last_compact_timepoint(std::chrono::steady_clock::now())
, last_get_avg_speed_timepoint(std::chrono::steady_clock::now())
, last_get_avg_speed_tokens(init_tokens_)
, avg_speed_per_sec(0.0)
, low_token_threshold(LOW_TOKEN_THRESHOLD_RATE * capacity_)
, log(Logger::get(log_id))
{}
Expand All @@ -54,6 +51,12 @@ class TokenBucket final

struct TokenBucketConfig
{
TokenBucketConfig()
: tokens(0.0)
, fill_rate(0.0)
, capacity(0.0)
{}

TokenBucketConfig(double tokens_, double fill_rate_, double capacity_)
: tokens(tokens_)
, fill_rate(fill_rate_)
Expand Down Expand Up @@ -83,21 +86,14 @@ class TokenBucket final
return {tokens, fill_rate, capacity};
}

double getAvgSpeedPerSec();

bool lowToken() const { return peek() <= low_token_threshold; }

bool isStatic() const { return fill_rate == 0.0; }

std::string toString() const
{
FmtBuffer fmt_buf;
fmt_buf.fmtAppend(
"tokens: {}, fill_rate: {}, capacity: {}, avg_speed_per_sec: {}",
tokens,
fill_rate,
capacity,
avg_speed_per_sec);
fmt_buf.fmtAppend("tokens: {}, fill_rate: {}, capacity: {}", tokens, fill_rate, capacity);
SeaRise marked this conversation as resolved.
Show resolved Hide resolved
return fmt_buf.toString();
}

Expand All @@ -116,10 +112,6 @@ class TokenBucket final

TimePoint last_compact_timepoint;

TimePoint last_get_avg_speed_timepoint;
double last_get_avg_speed_tokens;
double avg_speed_per_sec;

double low_token_threshold;

LoggerPtr log;
Expand Down