Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

make tiflash scale-in faster #8432

Merged
merged 9 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions dbms/src/Common/Config/ConfigReloader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ ConfigReloader::~ConfigReloader()
{
quit = true;

cv.notify_all();
if (thread.joinable())
thread.join();
}
Expand All @@ -62,18 +63,16 @@ void ConfigReloader::run()

while (true)
{
if (quit)
std::unique_lock lock(reload_mutex);
if (cv.wait_for(lock, reload_interval, [this]() { return quit.load(); }))
return;

std::this_thread::sleep_for(reload_interval);
reloadIfNewer(false, /* throw_on_error = */ false);
}
}

void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error)
{
std::lock_guard lock(reload_mutex);

FilesChangesTracker new_files = getNewFileList();
bool config_object_updated = false;
for (const auto & conf : config_objects)
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Common/Config/ConfigReloader.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ class ConfigReloader

std::atomic_bool quit{false};
std::thread thread;

std::condition_variable cv;
};

} // namespace DB
3 changes: 2 additions & 1 deletion dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ namespace DB
{
void LocalAdmissionController::warmupResourceGroupInfoCache(const std::string & name)
{
assert(!stopped);
if (unlikely(stopped))
return;

if (name.empty())
return;
Expand Down
92 changes: 53 additions & 39 deletions dbms/src/Flash/ResourceControl/LocalAdmissionController.h
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,9 @@ class LocalAdmissionController final : private boost::noncopyable

uint64_t estWaitDuraMS(const std::string & name) const
{
if (unlikely(stopped))
return 0;

if (name.empty())
return 0;

Expand All @@ -472,7 +475,8 @@ class LocalAdmissionController final : private boost::noncopyable

std::optional<uint64_t> getPriority(const std::string & name)
{
assert(!stopped);
if (unlikely(stopped))
return {HIGHEST_RESOURCE_GROUP_PRIORITY};

if (name.empty())
return {HIGHEST_RESOURCE_GROUP_PRIORITY};
Expand All @@ -495,7 +499,9 @@ class LocalAdmissionController final : private boost::noncopyable

void registerRefillTokenCallback(const std::function<void()> & cb)
{
assert(!stopped);
if (unlikely(stopped))
return;

// NOTE: Better not use lock inside refill_token_callback,
// because LAC needs to lock when calling refill_token_callback,
// which may introduce dead lock.
Expand All @@ -506,53 +512,22 @@ class LocalAdmissionController final : private boost::noncopyable

void unregisterRefillTokenCallback()
{
assert(!stopped);
if (unlikely(stopped))
return;

std::lock_guard lock(mu);
RUNTIME_CHECK_MSG(refill_token_callback != nullptr, "callback cannot be nullptr before unregistering");
refill_token_callback = nullptr;
}

#ifdef DBMS_PUBLIC_GTEST
static std::unique_ptr<MockLocalAdmissionController> global_instance;
#else
static std::unique_ptr<LocalAdmissionController> global_instance;
#endif

// Interval of fetch from GAC periodically.
static constexpr auto DEFAULT_FETCH_GAC_INTERVAL = std::chrono::seconds(5);
static constexpr auto DEFAULT_FETCH_GAC_INTERVAL_MS = 5000;

private:
void consumeResource(const std::string & name, double ru, uint64_t cpu_time_in_ns)
void stop()
{
assert(!stopped);

// When tidb_enable_resource_control is disabled, resource group name is empty.
if (name.empty())
return;

ResourceGroupPtr group = findResourceGroup(name);
if unlikely (!group)
if (stopped)
{
LOG_INFO(log, "cannot consume ru for {}, maybe it has been deleted", name);
LOG_DEBUG(log, "LAC already stopped");
return;
}

group->consumeResource(ru, cpu_time_in_ns);
if (group->lowToken() || group->trickleModeLeaseExpire(SteadyClock::now()))
{
{
std::lock_guard lock(mu);
low_token_resource_groups.insert(name);
}
cv.notify_one();
}
}

void stop()
{
if (stopped)
return;
stopped.store(true);

// TryCancel() is thread safe(https://github.com/grpc/grpc/pull/30416).
Expand Down Expand Up @@ -602,6 +577,45 @@ class LocalAdmissionController final : private boost::noncopyable
getCurrentExceptionMessage(false));
}
}
LOG_INFO(log, "LAC stopped done: final report size: {}", acquire_infos.size());
}

#ifdef DBMS_PUBLIC_GTEST
static std::unique_ptr<MockLocalAdmissionController> global_instance;
#else
static std::unique_ptr<LocalAdmissionController> global_instance;
#endif

// Interval of fetch from GAC periodically.
static constexpr auto DEFAULT_FETCH_GAC_INTERVAL = std::chrono::seconds(5);
static constexpr auto DEFAULT_FETCH_GAC_INTERVAL_MS = 5000;

private:
void consumeResource(const std::string & name, double ru, uint64_t cpu_time_in_ns)
{
if (unlikely(stopped))
return;

// When tidb_enable_resource_control is disabled, resource group name is empty.
if (name.empty())
return;

ResourceGroupPtr group = findResourceGroup(name);
if unlikely (!group)
{
LOG_INFO(log, "cannot consume ru for {}, maybe it has been deleted", name);
return;
}

group->consumeResource(ru, cpu_time_in_ns);
if (group->lowToken() || group->trickleModeLeaseExpire(SteadyClock::now()))
{
{
std::lock_guard lock(mu);
low_token_resource_groups.insert(name);
}
cv.notify_one();
}
}

// If we cannot get GAC resp for DEGRADE_MODE_DURATION seconds, enter degrade mode.
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Server/MetricsPrometheus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ MetricsPrometheus::MetricsPrometheus(Context & context, const AsynchronousMetric

MetricsPrometheus::~MetricsPrometheus()
{
timer.cancel(true);
timer.cancel(false);
}

void MetricsPrometheus::run()
Expand Down
110 changes: 60 additions & 50 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1562,53 +1562,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
});

auto & tmt_context = global_context->getTMTContext();

// For test mode, TaskScheduler and LAC is controlled by test case.
// TODO: resource control is not supported for WN. So disable pipeline model and LAC.
const bool init_pipeline_and_lac
= !global_context->isTest() && !global_context->getSharedContextDisagg()->isDisaggregatedStorageMode();
if (init_pipeline_and_lac)
{
#ifdef DBMS_PUBLIC_GTEST
LocalAdmissionController::global_instance = std::make_unique<MockLocalAdmissionController>();
#else
LocalAdmissionController::global_instance
= std::make_unique<LocalAdmissionController>(tmt_context.getKVCluster(), tmt_context.getEtcdClient());
#endif

auto get_pool_size = [](const auto & setting) {
return setting == 0 ? getNumberOfLogicalCPUCores() : static_cast<size_t>(setting);
};
TaskSchedulerConfig config{
{get_pool_size(settings.pipeline_cpu_task_thread_pool_size),
settings.pipeline_cpu_task_thread_pool_queue_type},
{get_pool_size(settings.pipeline_io_task_thread_pool_size),
settings.pipeline_io_task_thread_pool_queue_type},
};
RUNTIME_CHECK(!TaskScheduler::instance);
TaskScheduler::instance = std::make_unique<TaskScheduler>(config);
LOG_INFO(log, "init pipeline task scheduler with {}", config.toString());
}

SCOPE_EXIT({
if (init_pipeline_and_lac)
{
assert(TaskScheduler::instance);
TaskScheduler::instance.reset();
assert(LocalAdmissionController::global_instance);
LocalAdmissionController::global_instance.reset();
}
});

if (settings.enable_async_grpc_client)
{
auto size = settings.grpc_completion_queue_pool_size;
if (size == 0)
size = std::thread::hardware_concurrency();
GRPCCompletionQueuePool::global_instance = std::make_unique<GRPCCompletionQueuePool>(size);
}

// FIXME: (bootstrap) we should bootstrap the tiflash node more early!
if (global_context->getSharedContextDisagg()->notDisaggregatedMode()
|| /*has_been_bootstrap*/ store_ident.has_value())
Expand All @@ -1629,9 +1582,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
wn_ps->waitUntilInitedFromRemoteStore();
}

/// Then, startup grpc server to serve raft and/or flash services.
FlashGrpcServerHolder flash_grpc_server_holder(this->context(), this->config(), raft_config, log);

{
TcpHttpServersHolder tcpHttpServersHolder(*this, settings, log);

Expand Down Expand Up @@ -1678,6 +1628,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
auto metrics_prometheus = std::make_unique<MetricsPrometheus>(*global_context, async_metrics);

SessionCleaner session_cleaner(*global_context);
auto & tmt_context = global_context->getTMTContext();

if (proxy_conf.is_proxy_runnable)
{
Expand Down Expand Up @@ -1792,6 +1743,65 @@ int Server::main(const std::vector<std::string> & /*args*/)
GET_METRIC(tiflash_server_info, start_time).Set(ts.epochTime());
}

// For test mode, TaskScheduler and LAC is controlled by test case.
// TODO: resource control is not supported for WN. So disable pipeline model and LAC.
const bool init_pipeline_and_lac
= !global_context->isTest() && !global_context->getSharedContextDisagg()->isDisaggregatedStorageMode();
if (init_pipeline_and_lac)
{
#ifdef DBMS_PUBLIC_GTEST
LocalAdmissionController::global_instance = std::make_unique<MockLocalAdmissionController>();
#else
LocalAdmissionController::global_instance
= std::make_unique<LocalAdmissionController>(tmt_context.getKVCluster(), tmt_context.getEtcdClient());
#endif

auto get_pool_size = [](const auto & setting) {
return setting == 0 ? getNumberOfLogicalCPUCores() : static_cast<size_t>(setting);
};
TaskSchedulerConfig config{
{get_pool_size(settings.pipeline_cpu_task_thread_pool_size),
settings.pipeline_cpu_task_thread_pool_queue_type},
{get_pool_size(settings.pipeline_io_task_thread_pool_size),
settings.pipeline_io_task_thread_pool_queue_type},
};
RUNTIME_CHECK(!TaskScheduler::instance);
TaskScheduler::instance = std::make_unique<TaskScheduler>(config);
LOG_INFO(log, "init pipeline task scheduler with {}", config.toString());
}

SCOPE_EXIT({
if (init_pipeline_and_lac)
{
assert(TaskScheduler::instance);
TaskScheduler::instance.reset();
assert(LocalAdmissionController::global_instance);
LocalAdmissionController::global_instance.reset();
}
});

if (settings.enable_async_grpc_client)
{
auto size = settings.grpc_completion_queue_pool_size;
if (size == 0)
size = std::thread::hardware_concurrency();
GRPCCompletionQueuePool::global_instance = std::make_unique<GRPCCompletionQueuePool>(size);
}

/// startup grpc server to serve raft and/or flash services.
FlashGrpcServerHolder flash_grpc_server_holder(this->context(), this->config(), raft_config, log);

SCOPE_EXIT({
// Stop LAC for AutoScaler managed CN before FlashGrpcServerHolder is destructed.
// Because AutoScaler it will kill tiflash process when port of flash_server_addr is down.
// And we want to make sure LAC is cleanedup.
// The effects are there will be no resource control during [lac.stop(), FlashGrpcServer destruct done],
// but it's basically ok, that duration is small(normally 100-200ms).
if (global_context->getSharedContextDisagg()->isDisaggregatedComputeMode() && use_autoscaler
&& LocalAdmissionController::global_instance)
LocalAdmissionController::global_instance->stop();
});

tmt_context.setStatusRunning();

try
Expand Down