Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix background_pool_size not take effect and BackgroundProcessingPool::getThreadIds may misses some thread_ids. #4686

Merged
merged 5 commits into from
Apr 18, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ target_link_libraries (dbms
${RE2_ST_LIBRARY}
${OPENSSL_CRYPTO_LIBRARY}
${BTRIE_LIBRARIES}
absl::synchronization
)

if (NOT USE_INTERNAL_RE2_LIBRARY)
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Encryption/RateLimiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,7 @@ void IORateLimiter::setBackgroundThreadIds(std::vector<pid_t> thread_ids)
{
std::lock_guard lock(bg_thread_ids_mtx);
bg_thread_ids.swap(thread_ids);
LOG_FMT_INFO(log, "bg_thread_ids {} => {}", bg_thread_ids.size(), bg_thread_ids);
}

std::pair<Int64, Int64> IORateLimiter::getReadWriteBytes(const std::string & fname [[maybe_unused]])
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/Interpreters/Context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1548,11 +1548,11 @@ FileProviderPtr Context::getFileProvider() const
return shared->file_provider;
}

void Context::initializeRateLimiter(Poco::Util::AbstractConfiguration & config)
void Context::initializeRateLimiter(Poco::Util::AbstractConfiguration & config, BackgroundProcessingPool & bg_pool, BackgroundProcessingPool & blockable_bg_pool) const
{
getIORateLimiter().init(config);
auto tids = getBackgroundPool().getThreadIds();
auto blockable_tids = getBlockableBackgroundPool().getThreadIds();
auto tids = bg_pool.getThreadIds();
auto blockable_tids = blockable_bg_pool.getThreadIds();
tids.insert(tids.end(), blockable_tids.begin(), blockable_tids.end());
getIORateLimiter().setBackgroundThreadIds(tids);
}
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/Context.h
Original file line number Diff line number Diff line change
Expand Up @@ -404,7 +404,7 @@ class Context
void initializeFileProvider(KeyManagerPtr key_manager, bool enable_encryption);
FileProviderPtr getFileProvider() const;

void initializeRateLimiter(Poco::Util::AbstractConfiguration & config);
void initializeRateLimiter(Poco::Util::AbstractConfiguration & config, BackgroundProcessingPool & bg_pool, BackgroundProcessingPool & blockable_bg_pool) const;
WriteLimiterPtr getWriteLimiter() const;
ReadLimiterPtr getReadLimiter() const;
IORateLimiter & getIORateLimiter() const;
Expand Down
25 changes: 16 additions & 9 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1215,8 +1215,22 @@ int Server::main(const std::vector<std::string> & /*args*/)
/// Init TiFlash metrics.
global_context->initializeTiFlashMetrics();

/// Init Rate Limiter
global_context->initializeRateLimiter(config());
/// Initialize users config reloader.
auto users_config_reloader = UserConfig::parseSettings(config(), config_path, global_context, log);

/// Load global settings from default_profile and system_profile.
/// It internally depends on UserConfig::parseSettings.
global_context->setDefaultProfiles(config());
Settings & settings = global_context->getSettingsRef();

JinheLin marked this conversation as resolved.
Show resolved Hide resolved
/// Initialize the background thread pool.
/// It internally depends on settings.background_pool_size,
/// so must be called after settings has been load.
auto & bg_pool = global_context->getBackgroundPool();
auto & blockable_bg_pool = global_context->getBlockableBackgroundPool();

/// Initialize RateLimiter.
global_context->initializeRateLimiter(config(), bg_pool, blockable_bg_pool);

/// Initialize main config reloader.
auto main_config_reloader = std::make_unique<ConfigReloader>(
Expand All @@ -1230,9 +1244,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
},
/* already_loaded = */ true);

/// Initialize users config reloader.
auto users_config_reloader = UserConfig::parseSettings(config(), config_path, global_context, log);

/// Reload config in SYSTEM RELOAD CONFIG query.
global_context->setConfigReloadCallback([&]() {
main_config_reloader->reload();
Expand All @@ -1254,10 +1265,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
bool use_l0_opt = config().getBool("l0_optimize", false);
global_context->setUseL0Opt(use_l0_opt);

/// Load global settings from default_profile and system_profile.
global_context->setDefaultProfiles(config());
Settings & settings = global_context->getSettingsRef();

/// Size of cache for marks (index of MergeTree family of tables). It is necessary.
size_t mark_cache_size = config().getUInt64("mark_cache_size", DEFAULT_MARK_CACHE_SIZE);
if (mark_cache_size)
Expand Down
27 changes: 22 additions & 5 deletions dbms/src/Storages/BackgroundProcessingPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,15 @@
#ifdef __linux__
#include <sys/syscall.h>
#include <unistd.h>
inline static pid_t gettid()
{
return syscall(SYS_gettid);
}
#elif
inline static pid_t gettid()
{
return -1;
}
#endif

namespace CurrentMetrics
Expand Down Expand Up @@ -76,6 +85,7 @@ void BackgroundProcessingPool::TaskInfo::wake()

BackgroundProcessingPool::BackgroundProcessingPool(int size_)
: size(size_)
, thread_ids_counter(size_)
{
LOG_FMT_INFO(&Poco::Logger::get("BackgroundProcessingPool"), "Create BackgroundProcessingPool with {} threads", size);

Expand Down Expand Up @@ -140,9 +150,7 @@ void BackgroundProcessingPool::threadFunction()
const auto name = "BkgPool" + std::to_string(tid++);
setThreadName(name.data());
is_background_thread = true;
#ifdef __linux__
addThreadId(syscall(SYS_gettid));
#endif
addThreadId(gettid());
}

MemoryTracker memory_tracker;
Expand Down Expand Up @@ -272,14 +280,23 @@ void BackgroundProcessingPool::threadFunction()

std::vector<pid_t> BackgroundProcessingPool::getThreadIds()
{
thread_ids_counter.Wait();
std::lock_guard lock(thread_ids_mtx);
if (thread_ids.size() != size)
{
LOG_FMT_ERROR(&Poco::Logger::get("BackgroundProcessingPool"), "thread_ids.size is {}, but {} is required", thread_ids.size(), size);
throw Exception("Background threads' number not match");
}
return thread_ids;
}

void BackgroundProcessingPool::addThreadId(pid_t tid)
{
std::lock_guard lock(thread_ids_mtx);
thread_ids.push_back(tid);
{
std::lock_guard lock(thread_ids_mtx);
thread_ids.push_back(tid);
}
thread_ids_counter.DecrementCount();
}

} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/Storages/BackgroundProcessingPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <Core/Types.h>
#include <Poco/Event.h>
#include <Poco/Timestamp.h>
#include <absl/synchronization/blocking_counter.h>

#include <atomic>
#include <condition_variable>
Expand Down Expand Up @@ -117,6 +118,7 @@ class BackgroundProcessingPool
Threads threads;
std::vector<pid_t> thread_ids; // Linux Thread ID
std::mutex thread_ids_mtx;
absl::BlockingCounter thread_ids_counter;

std::atomic<bool> shutdown{false};
std::condition_variable wake_event;
Expand Down
1 change: 1 addition & 0 deletions libs/libcommon/include/common/logger_useful.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <Poco/Logger.h>
#include <fmt/format.h>
#include <fmt/ranges.h>

#ifndef QUERY_PREVIEW_LENGTH
#define QUERY_PREVIEW_LENGTH 160
Expand Down