Skip to content

Commit

Permalink
Refactor initialize of background pool (#5190)
Browse files Browse the repository at this point in the history
close #5189
  • Loading branch information
Lloyd-Pottiger authored Jul 11, 2022
1 parent 4619605 commit 1388c11
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 24 deletions.
26 changes: 18 additions & 8 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,18 +68,16 @@
#include <fmt/core.h>

#include <boost/functional/hash/hash.hpp>
#include <map>
#include <pcg_random.hpp>
#include <set>
#include <unordered_map>


namespace ProfileEvents
{
extern const Event ContextLock;
}

#include <set>

namespace CurrentMetrics
{
extern const Metric GlobalStorageRunMode;
Expand Down Expand Up @@ -1440,20 +1438,32 @@ void Context::dropCaches() const
shared->mark_cache->reset();
}

BackgroundProcessingPool & Context::getBackgroundPool()
BackgroundProcessingPool & Context::initializeBackgroundPool(UInt16 pool_size)
{
auto lock = getLock();
if (!shared->background_pool)
shared->background_pool = std::make_shared<BackgroundProcessingPool>(settings.background_pool_size);
shared->background_pool = std::make_shared<BackgroundProcessingPool>(pool_size);
return *shared->background_pool;
}

BackgroundProcessingPool & Context::getBlockableBackgroundPool()
BackgroundProcessingPool & Context::getBackgroundPool()
{
auto lock = getLock();
return *shared->background_pool;
}

BackgroundProcessingPool & Context::initializeBlockableBackgroundPool(UInt16 pool_size)
{
// TODO: choose a better thread pool size and maybe a better name for the pool
auto lock = getLock();
if (!shared->blockable_background_pool)
shared->blockable_background_pool = std::make_shared<BackgroundProcessingPool>(settings.background_pool_size);
shared->blockable_background_pool = std::make_shared<BackgroundProcessingPool>(pool_size);
return *shared->blockable_background_pool;
}

BackgroundProcessingPool & Context::getBlockableBackgroundPool()
{
// TODO: maybe a better name for the pool
auto lock = getLock();
return *shared->blockable_background_pool;
}

Expand Down
4 changes: 3 additions & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,9 @@ class Context
void setUseL0Opt(bool use_l0_opt);
bool useL0Opt() const;

BackgroundProcessingPool & initializeBackgroundPool(UInt16 pool_size);
BackgroundProcessingPool & getBackgroundPool();
BackgroundProcessingPool & initializeBlockableBackgroundPool(UInt16 pool_size);
BackgroundProcessingPool & getBlockableBackgroundPool();

void createTMTContext(const TiFlashRaftConfig & raft_config, pingcap::ClusterConfig && cluster_config);
Expand Down Expand Up @@ -505,7 +507,7 @@ class DDLGuard
class SessionCleaner
{
public:
SessionCleaner(Context & context_)
explicit SessionCleaner(Context & context_)
: context{context_}
{}
~SessionCleaner();
Expand Down
24 changes: 11 additions & 13 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,15 @@
#include <Poco/Net/NetException.h>
#include <Poco/StringTokenizer.h>
#include <Poco/Timestamp.h>
#include <Server/HTTPHandlerFactory.h>
#include <Server/MetricsPrometheus.h>
#include <Server/MetricsTransmitter.h>
#include <Server/RaftConfigParser.h>
#include <Server/Server.h>
#include <Server/ServerInfo.h>
#include <Server/StatusFile.h>
#include <Server/StorageConfigParser.h>
#include <Server/TCPHandlerFactory.h>
#include <Server/UserConfigParser.h>
#include <Storages/FormatVersion.h>
#include <Storages/IManageableStorage.h>
Expand All @@ -82,12 +87,6 @@
#include <limits>
#include <memory>

#include "HTTPHandlerFactory.h"
#include "MetricsPrometheus.h"
#include "MetricsTransmitter.h"
#include "StatusFile.h"
#include "TCPHandlerFactory.h"

#if Poco_NetSSL_FOUND
#include <Poco/Net/Context.h>
#include <Poco/Net/SecureServerSocket.h>
Expand Down Expand Up @@ -1144,6 +1143,12 @@ int Server::main(const std::vector<std::string> & /*args*/)
global_context->getPathCapacity(),
global_context->getFileProvider());

/// Initialize the background & blockable background thread pool.
Settings & settings = global_context->getSettingsRef();
LOG_FMT_INFO(log, "Background & Blockable Background pool size: {}", settings.background_pool_size);
auto & bg_pool = global_context->initializeBackgroundPool(settings.background_pool_size);
auto & blockable_bg_pool = global_context->initializeBlockableBackgroundPool(settings.background_pool_size);

global_context->initializePageStorageMode(global_context->getPathPool(), STORAGE_FORMAT_CURRENT.page);
global_context->initializeGlobalStoragePoolIfNeed(global_context->getPathPool());
LOG_FMT_INFO(log, "Global PageStorage run mode is {}", static_cast<UInt8>(global_context->getPageStorageRunMode()));
Expand Down Expand Up @@ -1260,13 +1265,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// Load global settings from default_profile and system_profile.
/// It internally depends on UserConfig::parseSettings.
global_context->setDefaultProfiles(config());
Settings & settings = global_context->getSettingsRef();

/// Initialize the background thread pool.
/// It internally depends on settings.background_pool_size,
/// so must be called after settings has been load.
auto & bg_pool = global_context->getBackgroundPool();
auto & blockable_bg_pool = global_context->getBlockableBackgroundPool();

/// Initialize RateLimiter.
global_context->initializeRateLimiter(config(), bg_pool, blockable_bg_pool);
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Storages/BackgroundProcessingPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class BackgroundProcessingPool
using TaskHandle = std::shared_ptr<TaskInfo>;


BackgroundProcessingPool(int size_);
explicit BackgroundProcessingPool(int size_);

size_t getNumberOfThreads() const { return size; }

Expand All @@ -96,7 +96,7 @@ class BackgroundProcessingPool
/// 2. thread B also get the same task
/// 3. thread A finish the execution of the task quickly, release the task and try to update the next schedule time of the task
/// 4. thread B find the task is not occupied and execute the task again almost immediately
TaskHandle addTask(const Task & task, const bool multi = true, const size_t interval_ms = 0);
TaskHandle addTask(const Task & task, bool multi = true, size_t interval_ms = 0);
void removeTask(const TaskHandle & task);

~BackgroundProcessingPool();
Expand Down
5 changes: 5 additions & 0 deletions dbms/src/TestUtils/TiFlashTestEnv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,11 @@ void TiFlashTestEnv::initializeGlobalContext(Strings testdata_path, PageStorageR
KeyManagerPtr key_manager = std::make_shared<MockKeyManager>(false);
global_context->initializeFileProvider(key_manager, false);

// initialize background & blockable background thread pool
Settings & settings = global_context->getSettingsRef();
global_context->initializeBackgroundPool(settings.background_pool_size);
global_context->initializeBlockableBackgroundPool(settings.background_pool_size);

// Theses global variables should be initialized by the following order
// 1. capacity
// 2. path pool
Expand Down

0 comments on commit 1388c11

Please sign in to comment.