Skip to content

Commit

Permalink
Merge branch 'fuzhe/dynamic_thread_pool' of github.com:fuzhe1989/tics…
Browse files Browse the repository at this point in the history
… into fuzhe/dynamic_thread_pool
  • Loading branch information
fuzhe1989 committed Dec 28, 2021
2 parents 28077a8 + 959cea7 commit c879bbb
Show file tree
Hide file tree
Showing 20 changed files with 381 additions and 318 deletions.
30 changes: 15 additions & 15 deletions dbms/src/Common/CPUAffinityManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ void CPUAffinityManager::bindSelfQueryThread() const
{
if (enable())
{
LOG_INFO(log, "Thread: " << ::getThreadName() << " bindQueryThread.");
LOG_FMT_INFO(log, "Thread: {} bindQueryThread.", ::getThreadName());
// If tid is zero, then the calling thread is used.
bindQueryThread(0);
}
Expand All @@ -120,7 +120,7 @@ void CPUAffinityManager::bindSelfOtherThread() const
{
if (enable())
{
LOG_INFO(log, "Thread: " << ::getThreadName() << " bindOtherThread.");
LOG_FMT_INFO(log, "Thread: {} bindOtherThread.", ::getThreadName());
// If tid is zero, then the calling thread is used.
bindOtherThread(0);
}
Expand Down Expand Up @@ -183,7 +183,7 @@ void CPUAffinityManager::setAffinity(pid_t tid, const cpu_set_t & cpu_set) const
int ret = sched_setaffinity(tid, sizeof(cpu_set), &cpu_set);
if (ret != 0)
{
LOG_ERROR(log, "sched_setaffinity fail but ignore error: " << std::strerror(errno));
LOG_FMT_ERROR(log, "sched_setaffinity fail but ignore error: {}", std::strerror(errno));
}
}

Expand All @@ -192,7 +192,7 @@ bool CPUAffinityManager::enable() const
return 0 < query_cpu_percent && query_cpu_percent < 100 && cpu_cores > 1;
}

std::string CPUAffinityManager::cpuSetToString(const cpu_set_t & cpu_set) const
std::string CPUAffinityManager::cpuSetToString(const cpu_set_t & cpu_set)
{
auto v = cpuSetToVec(cpu_set);
std::string s;
Expand All @@ -203,7 +203,7 @@ std::string CPUAffinityManager::cpuSetToString(const cpu_set_t & cpu_set) const
return s;
}

std::vector<int> CPUAffinityManager::cpuSetToVec(const cpu_set_t & cpu_set) const
std::vector<int> CPUAffinityManager::cpuSetToVec(const cpu_set_t & cpu_set)
{
std::vector<int> v;
for (int i = 0; i < static_cast<int>(sizeof(cpu_set)); i++)
Expand All @@ -217,7 +217,7 @@ std::vector<int> CPUAffinityManager::cpuSetToVec(const cpu_set_t & cpu_set) cons
}

// /proc/17022/task/17022 -> 17022
std::string CPUAffinityManager::getShortFilename(const std::string & path) const
std::string CPUAffinityManager::getShortFilename(const std::string & path)
{
auto pos = path.find_last_of('/');
if (pos == std::string::npos)
Expand All @@ -243,13 +243,13 @@ std::vector<pid_t> CPUAffinityManager::getThreadIDs(const std::string & dir) con
}
catch (std::exception & e)
{
LOG_ERROR(log, "dir " << dir << " path " << iter->path() << " exception " << e.what());
LOG_FMT_ERROR(log, "dir {} path {} exception {}", dir, iter->path(), e.what());
}
}
return tids;
}

std::string CPUAffinityManager::getThreadName(const std::string & fname) const
std::string CPUAffinityManager::getThreadName(const std::string & fname)
{
std::ifstream ifs(fname);
if (ifs.fail())
Expand All @@ -265,7 +265,7 @@ std::unordered_map<pid_t, std::string> CPUAffinityManager::getThreads(pid_t pid)
{
std::string task_dir = "/proc/" + std::to_string(pid) + "/task";
auto tids = getThreadIDs(task_dir);
LOG_DEBUG(log, task_dir << " thread count " << tids.size());
LOG_FMT_DEBUG(log, "{} thread count {}", task_dir, tids.size());
std::unordered_map<pid_t, std::string> threads;
for (auto tid : tids)
{
Expand All @@ -286,12 +286,12 @@ void CPUAffinityManager::bindThreadCPUAffinity() const
{
if (isQueryThread(t.second))
{
LOG_INFO(log, "Thread: " << t.first << " " << t.second << " bindQueryThread.");
LOG_FMT_INFO(log, "Thread: {} {} bindQueryThread.", t.first, t.second);
bindQueryThread(t.first);
}
else
{
LOG_INFO(log, "Thread: " << t.first << " " << t.second << " bindOtherThread.");
LOG_FMT_INFO(log, "Thread: {} {} bindOtherThread.", t.first, t.second);
bindOtherThread(t.first);
}
}
Expand All @@ -309,17 +309,17 @@ void CPUAffinityManager::checkThreadCPUAffinity() const
int ret = sched_getaffinity(t.first, sizeof(cpu_set), &cpu_set);
if (ret != 0)
{
LOG_ERROR(log, "Thread: " << t.first << " " << t.second << " sched_getaffinity ret " << ret << " error " << strerror(errno));
LOG_FMT_ERROR(log, "Thread: {} {} sched_getaffinity ret {} error {}", t.first, t.second, ret, strerror(errno));
continue;
}
LOG_INFO(log, "Thread: " << t.first << " " << t.second << " bind on CPU: " << cpuSetToString(cpu_set));
LOG_FMT_INFO(log, "Thread: {} {} bind on CPU: {}", t.first, t.second, cpuSetToString(cpu_set));
if (isQueryThread(t.second) && !CPU_EQUAL(&cpu_set, &query_cpu_set))
{
LOG_ERROR(log, "Thread: " << t.first << " " << t.second << " is query thread and bind CPU info is error.");
LOG_FMT_ERROR(log, "Thread: {} {} is query thread and bind CPU info is error.", t.first, t.second);
}
else if (!isQueryThread(t.second) && !CPU_EQUAL(&cpu_set, &other_cpu_set))
{
LOG_ERROR(log, "Thread: " << t.first << " " << t.second << " is other thread and bind CPU info is error.");
LOG_FMT_ERROR(log, "Thread: {} {} is other thread and bind CPU info is error.", t.first, t.second);
}
}
}
Expand Down
12 changes: 7 additions & 5 deletions dbms/src/Common/CPUAffinityManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,19 +80,20 @@ class CPUAffinityManager
int getCPUCores() const;
int getQueryCPUCores() const;
int getOtherCPUCores() const;
void initCPUSet(cpu_set_t & cpu_set, int start, int count);
static void initCPUSet(cpu_set_t & cpu_set, int start, int count);
void checkThreadCPUAffinity() const;
// Bind thread t on cpu_set.
void setAffinity(pid_t tid, const cpu_set_t & cpu_set) const;
bool enable() const;

std::string cpuSetToString(const cpu_set_t & cpu_set) const;
std::vector<int> cpuSetToVec(const cpu_set_t & cpu_set) const;
static std::string cpuSetToString(const cpu_set_t & cpu_set);
static std::vector<int> cpuSetToVec(const cpu_set_t & cpu_set);


std::unordered_map<pid_t, std::string> getThreads(pid_t pid) const;
std::vector<pid_t> getThreadIDs(const std::string & dir) const;
std::string getThreadName(const std::string & fname) const;
std::string getShortFilename(const std::string & path) const;
static std::string getThreadName(const std::string & fname);
static std::string getShortFilename(const std::string & path);
bool isQueryThread(const std::string & name) const;

cpu_set_t query_cpu_set;
Expand All @@ -106,6 +107,7 @@ class CPUAffinityManager

CPUAffinityManager();
// Disable copy and move
public:
CPUAffinityManager(const CPUAffinityManager &) = delete;
CPUAffinityManager & operator=(const CPUAffinityManager &) = delete;
CPUAffinityManager(CPUAffinityManager &&) = delete;
Expand Down
7 changes: 5 additions & 2 deletions dbms/src/Common/TiFlashSecurity.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,9 +66,12 @@ struct TiFlashSecurityConfig
else
{
has_tls_config = true;
LOG_INFO(
LOG_FMT_INFO(
log,
"security config is set: ca path is " << ca_path << " cert path is " << cert_path << " key path is " << key_path);
"security config is set: ca path is {} cert path is {} key path is {}",
ca_path,
cert_path,
key_path);
}

if (config.has("security.cert_allowed_cn") && has_tls_config)
Expand Down
48 changes: 26 additions & 22 deletions dbms/src/DataStreams/CollapsingSortedBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -1,42 +1,45 @@
#include <Columns/ColumnsNumber.h>
#include <Common/FieldVisitors.h>
#include <Common/FmtUtils.h>
#include <DataStreams/CollapsingSortedBlockInputStream.h>
#include <Columns/ColumnsNumber.h>
#include <common/logger_useful.h>
#include <fmt/core.h>

/// Maximum number of messages about incorrect data in the log.
#define MAX_ERROR_MESSAGES 10


namespace DB
{

namespace ErrorCodes
{
extern const int INCORRECT_DATA;
extern const int LOGICAL_ERROR;
}
extern const int INCORRECT_DATA;
extern const int LOGICAL_ERROR;
} // namespace ErrorCodes


void CollapsingSortedBlockInputStream::reportIncorrectData()
{
std::stringstream s;
s << "Incorrect data: number of rows with sign = 1 (" << count_positive
<< ") differs with number of rows with sign = -1 (" << count_negative
<< ") by more than one (for key: ";

for (size_t i = 0, size = current_key.size(); i < size; ++i)
{
if (i != 0)
s << ", ";
s << applyVisitor(FieldVisitorToString(), (*(*current_key.columns)[i])[current_key.row_num]);
}

s << ").";
FmtBuffer fmt_buf;
fmt_buf.fmtAppend(
"Incorrect data: number of rows with sign = 1 ({}) differs with number of rows with sign = -1 ({}) by more than one (for key: ",
count_positive,
count_negative);

fmt_buf.joinStr(
std::begin(*current_key.columns),
std::end(*current_key.columns),
[this](const auto arg, FmtBuffer & fb) {
fb.append(applyVisitor(FieldVisitorToString(), (*arg)[current_key.row_num]));
},
", ");
fmt_buf.append(").");

/** Fow now we limit ourselves to just logging such situations,
* since the data is generated by external programs.
* With inconsistent data, this is an unavoidable error that can not be easily corrected by admins. Therefore Warning.
*/
LOG_WARNING(log, s.rdbuf());
LOG_WARNING(log, fmt_buf.toString());
}


Expand Down Expand Up @@ -112,7 +115,7 @@ Block CollapsingSortedBlockInputStream::readImpl()
init(merged_columns);

if (has_collation)
throw Exception("Logical error: " + getName() + " does not support collations", ErrorCodes::LOGICAL_ERROR);
throw Exception(fmt::format("Logical error: {} does not support collations", getName()), ErrorCodes::LOGICAL_ERROR);

if (merged_columns.empty())
return {};
Expand Down Expand Up @@ -195,7 +198,8 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st
last_is_positive = false;
}
else
throw Exception("Incorrect data: Sign = " + toString(sign) + " (must be 1 or -1).",
throw Exception(
fmt::format("Incorrect data: Sign = {} (must be 1 or -1).", sign),
ErrorCodes::INCORRECT_DATA);

if (!current->isLast())
Expand All @@ -216,4 +220,4 @@ void CollapsingSortedBlockInputStream::merge(MutableColumns & merged_columns, st
finished = true;
}

}
} // namespace DB
6 changes: 3 additions & 3 deletions dbms/src/DataStreams/MergeSortingBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,10 +121,10 @@ Block MergeSortingBlockInputStream::readImpl()
NativeBlockOutputStream block_out(compressed_buf, 0, header_without_constants);
MergeSortingBlocksBlockInputStream block_in(blocks, description, log, max_merged_block_size, limit);

LOG_INFO(log, "Sorting and writing part of data into temporary file " + path);
LOG_FMT_INFO(log, "Sorting and writing part of data into temporary file {}", path);
ProfileEvents::increment(ProfileEvents::ExternalSortWritePart);
copyData(block_in, block_out, &is_cancelled); /// NOTE. Possibly limit disk usage.
LOG_INFO(log, "Done writing part of data into temporary file " + path);
LOG_FMT_INFO(log, "Done writing part of data into temporary file {}", path);

blocks.clear();
sum_bytes_in_blocks = 0;
Expand All @@ -143,7 +143,7 @@ Block MergeSortingBlockInputStream::readImpl()
/// If there was temporary files.
ProfileEvents::increment(ProfileEvents::ExternalSortMerge);

LOG_INFO(log, "There are " << temporary_files.size() << " temporary sorted parts to merge.");
LOG_FMT_INFO(log, "There are {} temporary sorted parts to merge.", temporary_files.size());

/// Create sorted streams to merge.
for (const auto & file : temporary_files)
Expand Down
21 changes: 12 additions & 9 deletions dbms/src/Databases/DatabaseOrdinary.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include <Poco/DirectoryIterator.h>
#include <common/ThreadPool.h>
#include <common/logger_useful.h>
#include <fmt/core.h>


namespace DB
Expand Down Expand Up @@ -79,7 +80,7 @@ void DatabaseOrdinary::loadTables(Context & context, ThreadPool * thread_pool, b
std::sort(file_names.begin(), file_names.end());

size_t total_tables = file_names.size();
LOG_INFO(log, "Total " << total_tables << " tables.");
LOG_FMT_INFO(log, "Total {} tables.", total_tables);

String data_path = context.getPath() + "data/" + escapeForFileName(name) + "/";

Expand All @@ -94,7 +95,7 @@ void DatabaseOrdinary::loadTables(Context & context, ThreadPool * thread_pool, b
/// Messages, so that it's not boring to wait for the server to load for a long time.
if ((++tables_processed) % PRINT_MESSAGE_EACH_N_TABLES == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
{
LOG_INFO(log, std::fixed << std::setprecision(2) << tables_processed * 100.0 / total_tables << "%");
LOG_FMT_INFO(log, "{:.2f}%", tables_processed * 100.0 / total_tables);
watch.restart();
}

Expand All @@ -110,7 +111,9 @@ void DatabaseOrdinary::loadTables(Context & context, ThreadPool * thread_pool, b
auto begin = file_names.begin() + i * bunch_size;
auto end = (i + 1 == num_bunches) ? file_names.end() : (file_names.begin() + (i + 1) * bunch_size);

auto task = std::bind(task_function, begin, end);
auto task = [task_function, begin, end] {
return task_function(begin, end);
};

if (thread_pool)
thread_pool->schedule(task);
Expand Down Expand Up @@ -144,7 +147,7 @@ void DatabaseOrdinary::createTable(const Context & context, const String & table
{
std::lock_guard<std::mutex> lock(mutex);
if (tables.find(table_name) != tables.end())
throw Exception("Table " + name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
throw Exception(fmt::format("Table {}.{} already exists.", name, table_name), ErrorCodes::TABLE_ALREADY_EXISTS);
}

String table_metadata_path = getTableMetadataPath(table_name);
Expand All @@ -169,7 +172,7 @@ void DatabaseOrdinary::createTable(const Context & context, const String & table
{
std::lock_guard<std::mutex> lock(mutex);
if (!tables.emplace(table_name, table).second)
throw Exception("Table " + name + "." + table_name + " already exists.", ErrorCodes::TABLE_ALREADY_EXISTS);
throw Exception(fmt::format("Table {}.{} already exists.", name, table_name), ErrorCodes::TABLE_ALREADY_EXISTS);
}

context.getFileProvider()->renameFile(table_metadata_tmp_path, EncryptionPath(table_metadata_tmp_path, ""), table_metadata_path, EncryptionPath(table_metadata_path, ""), true);
Expand Down Expand Up @@ -217,13 +220,13 @@ void DatabaseOrdinary::renameTable(
StoragePtr table = tryGetTable(context, table_name);

if (!table)
throw Exception("Table " + name + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
throw Exception(fmt::format("Table {}.{} doesn't exist.", name, table_name), ErrorCodes::UNKNOWN_TABLE);

/// Notify the table that it is renamed. If the table does not support renaming, exception is thrown.
try
{
table->rename(
context.getPath() + "/data/" + escapeForFileName(to_database_concrete->name) + "/",
fmt::format("{}/data/{}/", context.getPath(), escapeForFileName(to_database_concrete->name)),
to_database_concrete->name,
to_table_name);
}
Expand All @@ -242,7 +245,7 @@ void DatabaseOrdinary::renameTable(

ASTPtr ast = DatabaseLoading::getQueryFromMetadata(context, detail::getTableMetadataPath(metadata_path, table_name));
if (!ast)
throw Exception("There is no metadata file for table " + table_name, ErrorCodes::FILE_DOESNT_EXIST);
throw Exception(fmt::format("There is no metadata file for table {}", table_name), ErrorCodes::FILE_DOESNT_EXIST);
ASTCreateQuery & ast_create_query = typeid_cast<ASTCreateQuery &>(*ast);
ast_create_query.table = to_table_name;

Expand Down Expand Up @@ -281,7 +284,7 @@ ASTPtr DatabaseOrdinary::getCreateTableQueryImpl(const Context & context, const

auto msg = has_table ? "There is no CREATE TABLE query for table " : "There is no metadata file for table ";

throw Exception(msg + table_name, ErrorCodes::CANNOT_GET_CREATE_TABLE_QUERY);
throw Exception(fmt::format("{}{}", msg, table_name), ErrorCodes::CANNOT_GET_CREATE_TABLE_QUERY);
}

return ast;
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Databases/DatabaseTiFlash.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ void DatabaseTiFlash::loadTables(Context & context, ThreadPool * thread_pool, bo
std::sort(table_files.begin(), table_files.end());

const auto total_tables = table_files.size();
LOG_INFO(log, "Total " << total_tables << " tables in database " << name);
LOG_FMT_INFO(log, "Total {} tables in database {}", total_tables, name);

AtomicStopwatch watch;
std::atomic<size_t> tables_processed{0};
Expand All @@ -114,7 +114,7 @@ void DatabaseTiFlash::loadTables(Context & context, ThreadPool * thread_pool, bo
/// Messages, so that it's not boring to wait for the server to load for a long time.
if ((++tables_processed) % PRINT_MESSAGE_EACH_N_TABLES == 0 || watch.compareAndRestart(PRINT_MESSAGE_EACH_N_SECONDS))
{
LOG_INFO(log, DB::toString(tables_processed * 100.0 / total_tables, 2) << "%");
LOG_FMT_INFO(log, "{:.2f}%", tables_processed * 100.0 / total_tables);
watch.restart();
}

Expand Down
Loading

0 comments on commit c879bbb

Please sign in to comment.