diff --git a/dbms/src/Common/Config/ConfigReloader.cpp b/dbms/src/Common/Config/ConfigReloader.cpp index ec1caa0fe34..f86761e1b0f 100644 --- a/dbms/src/Common/Config/ConfigReloader.cpp +++ b/dbms/src/Common/Config/ConfigReloader.cpp @@ -46,6 +46,7 @@ ConfigReloader::~ConfigReloader() { quit = true; + cv.notify_all(); if (thread.joinable()) thread.join(); } @@ -62,18 +63,16 @@ void ConfigReloader::run() while (true) { - if (quit) + std::unique_lock lock(reload_mutex); + if (cv.wait_for(lock, reload_interval, [this]() { return quit.load(); })) return; - std::this_thread::sleep_for(reload_interval); reloadIfNewer(false, /* throw_on_error = */ false); } } void ConfigReloader::reloadIfNewer(bool force, bool throw_on_error) { - std::lock_guard lock(reload_mutex); - FilesChangesTracker new_files = getNewFileList(); bool config_object_updated = false; for (const auto & conf : config_objects) diff --git a/dbms/src/Common/Config/ConfigReloader.h b/dbms/src/Common/Config/ConfigReloader.h index cb44aa7712f..194125356cb 100644 --- a/dbms/src/Common/Config/ConfigReloader.h +++ b/dbms/src/Common/Config/ConfigReloader.h @@ -92,6 +92,8 @@ class ConfigReloader std::atomic_bool quit{false}; std::thread thread; + + std::condition_variable cv; }; } // namespace DB diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp index a6a0a6b6fbc..35e82d263e5 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.cpp @@ -22,7 +22,8 @@ namespace DB { void LocalAdmissionController::warmupResourceGroupInfoCache(const std::string & name) { - assert(!stopped); + if (unlikely(stopped)) + return; if (name.empty()) return; diff --git a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h index 76d2dafe7b4..80bb4c077ed 100644 --- a/dbms/src/Flash/ResourceControl/LocalAdmissionController.h +++ b/dbms/src/Flash/ResourceControl/LocalAdmissionController.h @@ -458,6 +458,9 @@ class LocalAdmissionController final : private boost::noncopyable uint64_t estWaitDuraMS(const std::string & name) const { + if (unlikely(stopped)) + return 0; + if (name.empty()) return 0; @@ -472,7 +475,8 @@ class LocalAdmissionController final : private boost::noncopyable std::optional getPriority(const std::string & name) { - assert(!stopped); + if (unlikely(stopped)) + return {HIGHEST_RESOURCE_GROUP_PRIORITY}; if (name.empty()) return {HIGHEST_RESOURCE_GROUP_PRIORITY}; @@ -495,7 +499,9 @@ class LocalAdmissionController final : private boost::noncopyable void registerRefillTokenCallback(const std::function & cb) { - assert(!stopped); + if (unlikely(stopped)) + return; + // NOTE: Better not use lock inside refill_token_callback, // because LAC needs to lock when calling refill_token_callback, // which may introduce dead lock. @@ -506,53 +512,22 @@ class LocalAdmissionController final : private boost::noncopyable void unregisterRefillTokenCallback() { - assert(!stopped); + if (unlikely(stopped)) + return; + std::lock_guard lock(mu); RUNTIME_CHECK_MSG(refill_token_callback != nullptr, "callback cannot be nullptr before unregistering"); refill_token_callback = nullptr; } -#ifdef DBMS_PUBLIC_GTEST - static std::unique_ptr global_instance; -#else - static std::unique_ptr global_instance; -#endif - - // Interval of fetch from GAC periodically. - static constexpr auto DEFAULT_FETCH_GAC_INTERVAL = std::chrono::seconds(5); - static constexpr auto DEFAULT_FETCH_GAC_INTERVAL_MS = 5000; - -private: - void consumeResource(const std::string & name, double ru, uint64_t cpu_time_in_ns) + void stop() { - assert(!stopped); - - // When tidb_enable_resource_control is disabled, resource group name is empty. - if (name.empty()) - return; - - ResourceGroupPtr group = findResourceGroup(name); - if unlikely (!group) + if (stopped) { - LOG_INFO(log, "cannot consume ru for {}, maybe it has been deleted", name); + LOG_DEBUG(log, "LAC already stopped"); return; } - group->consumeResource(ru, cpu_time_in_ns); - if (group->lowToken() || group->trickleModeLeaseExpire(SteadyClock::now())) - { - { - std::lock_guard lock(mu); - low_token_resource_groups.insert(name); - } - cv.notify_one(); - } - } - - void stop() - { - if (stopped) - return; stopped.store(true); // TryCancel() is thread safe(https://github.com/grpc/grpc/pull/30416). @@ -602,6 +577,45 @@ class LocalAdmissionController final : private boost::noncopyable getCurrentExceptionMessage(false)); } } + LOG_INFO(log, "LAC stopped done: final report size: {}", acquire_infos.size()); + } + +#ifdef DBMS_PUBLIC_GTEST + static std::unique_ptr global_instance; +#else + static std::unique_ptr global_instance; +#endif + + // Interval of fetch from GAC periodically. + static constexpr auto DEFAULT_FETCH_GAC_INTERVAL = std::chrono::seconds(5); + static constexpr auto DEFAULT_FETCH_GAC_INTERVAL_MS = 5000; + +private: + void consumeResource(const std::string & name, double ru, uint64_t cpu_time_in_ns) + { + if (unlikely(stopped)) + return; + + // When tidb_enable_resource_control is disabled, resource group name is empty. + if (name.empty()) + return; + + ResourceGroupPtr group = findResourceGroup(name); + if unlikely (!group) + { + LOG_INFO(log, "cannot consume ru for {}, maybe it has been deleted", name); + return; + } + + group->consumeResource(ru, cpu_time_in_ns); + if (group->lowToken() || group->trickleModeLeaseExpire(SteadyClock::now())) + { + { + std::lock_guard lock(mu); + low_token_resource_groups.insert(name); + } + cv.notify_one(); + } } // If we cannot get GAC resp for DEGRADE_MODE_DURATION seconds, enter degrade mode. diff --git a/dbms/src/Server/MetricsPrometheus.cpp b/dbms/src/Server/MetricsPrometheus.cpp index 0f59e77a0bd..7d99611b195 100644 --- a/dbms/src/Server/MetricsPrometheus.cpp +++ b/dbms/src/Server/MetricsPrometheus.cpp @@ -311,7 +311,7 @@ MetricsPrometheus::MetricsPrometheus(Context & context, const AsynchronousMetric MetricsPrometheus::~MetricsPrometheus() { - timer.cancel(true); + timer.cancel(false); } void MetricsPrometheus::run() diff --git a/dbms/src/Server/Server.cpp b/dbms/src/Server/Server.cpp index 5acfcd4eb90..d6fbae985ed 100644 --- a/dbms/src/Server/Server.cpp +++ b/dbms/src/Server/Server.cpp @@ -1562,53 +1562,6 @@ int Server::main(const std::vector & /*args*/) } }); - auto & tmt_context = global_context->getTMTContext(); - - // For test mode, TaskScheduler and LAC is controlled by test case. - // TODO: resource control is not supported for WN. So disable pipeline model and LAC. - const bool init_pipeline_and_lac - = !global_context->isTest() && !global_context->getSharedContextDisagg()->isDisaggregatedStorageMode(); - if (init_pipeline_and_lac) - { -#ifdef DBMS_PUBLIC_GTEST - LocalAdmissionController::global_instance = std::make_unique(); -#else - LocalAdmissionController::global_instance - = std::make_unique(tmt_context.getKVCluster(), tmt_context.getEtcdClient()); -#endif - - auto get_pool_size = [](const auto & setting) { - return setting == 0 ? getNumberOfLogicalCPUCores() : static_cast(setting); - }; - TaskSchedulerConfig config{ - {get_pool_size(settings.pipeline_cpu_task_thread_pool_size), - settings.pipeline_cpu_task_thread_pool_queue_type}, - {get_pool_size(settings.pipeline_io_task_thread_pool_size), - settings.pipeline_io_task_thread_pool_queue_type}, - }; - RUNTIME_CHECK(!TaskScheduler::instance); - TaskScheduler::instance = std::make_unique(config); - LOG_INFO(log, "init pipeline task scheduler with {}", config.toString()); - } - - SCOPE_EXIT({ - if (init_pipeline_and_lac) - { - assert(TaskScheduler::instance); - TaskScheduler::instance.reset(); - assert(LocalAdmissionController::global_instance); - LocalAdmissionController::global_instance.reset(); - } - }); - - if (settings.enable_async_grpc_client) - { - auto size = settings.grpc_completion_queue_pool_size; - if (size == 0) - size = std::thread::hardware_concurrency(); - GRPCCompletionQueuePool::global_instance = std::make_unique(size); - } - // FIXME: (bootstrap) we should bootstrap the tiflash node more early! if (global_context->getSharedContextDisagg()->notDisaggregatedMode() || /*has_been_bootstrap*/ store_ident.has_value()) @@ -1629,9 +1582,6 @@ int Server::main(const std::vector & /*args*/) wn_ps->waitUntilInitedFromRemoteStore(); } - /// Then, startup grpc server to serve raft and/or flash services. - FlashGrpcServerHolder flash_grpc_server_holder(this->context(), this->config(), raft_config, log); - { TcpHttpServersHolder tcpHttpServersHolder(*this, settings, log); @@ -1678,6 +1628,7 @@ int Server::main(const std::vector & /*args*/) auto metrics_prometheus = std::make_unique(*global_context, async_metrics); SessionCleaner session_cleaner(*global_context); + auto & tmt_context = global_context->getTMTContext(); if (proxy_conf.is_proxy_runnable) { @@ -1792,6 +1743,65 @@ int Server::main(const std::vector & /*args*/) GET_METRIC(tiflash_server_info, start_time).Set(ts.epochTime()); } + // For test mode, TaskScheduler and LAC is controlled by test case. + // TODO: resource control is not supported for WN. So disable pipeline model and LAC. + const bool init_pipeline_and_lac + = !global_context->isTest() && !global_context->getSharedContextDisagg()->isDisaggregatedStorageMode(); + if (init_pipeline_and_lac) + { +#ifdef DBMS_PUBLIC_GTEST + LocalAdmissionController::global_instance = std::make_unique(); +#else + LocalAdmissionController::global_instance + = std::make_unique(tmt_context.getKVCluster(), tmt_context.getEtcdClient()); +#endif + + auto get_pool_size = [](const auto & setting) { + return setting == 0 ? getNumberOfLogicalCPUCores() : static_cast(setting); + }; + TaskSchedulerConfig config{ + {get_pool_size(settings.pipeline_cpu_task_thread_pool_size), + settings.pipeline_cpu_task_thread_pool_queue_type}, + {get_pool_size(settings.pipeline_io_task_thread_pool_size), + settings.pipeline_io_task_thread_pool_queue_type}, + }; + RUNTIME_CHECK(!TaskScheduler::instance); + TaskScheduler::instance = std::make_unique(config); + LOG_INFO(log, "init pipeline task scheduler with {}", config.toString()); + } + + SCOPE_EXIT({ + if (init_pipeline_and_lac) + { + assert(TaskScheduler::instance); + TaskScheduler::instance.reset(); + assert(LocalAdmissionController::global_instance); + LocalAdmissionController::global_instance.reset(); + } + }); + + if (settings.enable_async_grpc_client) + { + auto size = settings.grpc_completion_queue_pool_size; + if (size == 0) + size = std::thread::hardware_concurrency(); + GRPCCompletionQueuePool::global_instance = std::make_unique(size); + } + + /// startup grpc server to serve raft and/or flash services. + FlashGrpcServerHolder flash_grpc_server_holder(this->context(), this->config(), raft_config, log); + + SCOPE_EXIT({ + // Stop LAC for AutoScaler managed CN before FlashGrpcServerHolder is destructed. + // Because AutoScaler it will kill tiflash process when port of flash_server_addr is down. + // And we want to make sure LAC is cleanedup. + // The effects are there will be no resource control during [lac.stop(), FlashGrpcServer destruct done], + // but it's basically ok, that duration is small(normally 100-200ms). + if (global_context->getSharedContextDisagg()->isDisaggregatedComputeMode() && use_autoscaler + && LocalAdmissionController::global_instance) + LocalAdmissionController::global_instance->stop(); + }); + tmt_context.setStatusRunning(); try