Skip to content

Commit

Permalink
Metrics of threads (#4169)
Browse files Browse the repository at this point in the history
* add threads metrics

* add threads metrics

Signed-off-by: bestwoody <[email protected]>

* format

Signed-off-by: bestwoody <[email protected]>

* update threadpool into thread_pool

Signed-off-by: bestwoody <[email protected]>

* UPDATE_CUR_AND_MAX_METRIC

Signed-off-by: bestwoody <[email protected]>

* refine include

Signed-off-by: bestwoody <[email protected]>
  • Loading branch information
bestwoody authored Mar 3, 2022
1 parent d321064 commit 05f5628
Show file tree
Hide file tree
Showing 9 changed files with 488 additions and 4 deletions.
15 changes: 12 additions & 3 deletions dbms/src/Common/DynamicThreadPool.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <Common/DynamicThreadPool.h>
#include <Common/TiFlashMetrics.h>

namespace DB
{
Expand Down Expand Up @@ -80,24 +81,32 @@ void DynamicThreadPool::scheduledToNewDynamicThread(TaskPtr & task)
t.detach();
}

void executeTask(const std::unique_ptr<IExecutableTask> & task)
{
UPDATE_CUR_AND_MAX_METRIC(tiflash_thread_count, type_active_threads_of_thdpool, type_max_active_threads_of_thdpool);
task->execute();
}

void DynamicThreadPool::fixedWork(size_t index)
{
UPDATE_CUR_AND_MAX_METRIC(tiflash_thread_count, type_total_threads_of_thdpool, type_max_threads_of_thdpool);
Queue * queue = fixed_queues[index].get();
while (true)
{
TaskPtr task;
queue->pop(task);
if (!task)
break;
task->execute();
executeTask(task);

idle_fixed_queues.push(queue);
}
}

void DynamicThreadPool::dynamicWork(TaskPtr initial_task)
{
initial_task->execute();
UPDATE_CUR_AND_MAX_METRIC(tiflash_thread_count, type_total_threads_of_thdpool, type_max_threads_of_thdpool);
executeTask(initial_task);

DynamicNode node;
while (true)
Expand All @@ -114,7 +123,7 @@ void DynamicThreadPool::dynamicWork(TaskPtr initial_task)

if (!node.task) // may be timeout or cancelled
break;
node.task->execute();
executeTask(node.task);
node.task.reset();
}
alive_dynamic_threads.fetch_sub(1);
Expand Down
4 changes: 4 additions & 0 deletions dbms/src/Common/Stopwatch.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@ inline UInt64 nanoseconds(clockid_t clock_type)
clock_gettime(clock_type, &ts);
return ts.tv_sec * 1000000000ULL + ts.tv_nsec;
}
inline UInt64 seconds(clockid_t clock_type)
{
return nanoseconds(clock_type) / 1000000000ULL;
}
} // namespace StopWatchDetail


Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Common/ThreadFactory.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <Common/MemoryTrackerSetter.h>
#include <Common/TiFlashMetrics.h>
#include <Common/setThreadName.h>
#include <common/ThreadPool.h>

Expand All @@ -22,6 +23,7 @@ class ThreadFactory
{
auto memory_tracker = current_memory_tracker;
auto wrapped_func = [propagate_memory_tracker, memory_tracker, thread_name = std::move(thread_name), f = std::move(f)](auto &&... args) {
UPDATE_CUR_AND_MAX_METRIC(tiflash_thread_count, type_total_threads_of_raw, type_max_threads_of_raw);
MemoryTrackerSetter setter(propagate_memory_tracker, memory_tracker);
if (!thread_name.empty())
setThreadName(thread_name.c_str());
Expand Down
26 changes: 26 additions & 0 deletions dbms/src/Common/ThreadMetricUtil.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
#include <Common/Stopwatch.h>
#include <Common/ThreadMetricUtil.h>
#include <Common/TiFlashMetrics.h>
#include <common/types.h>

std::atomic<UInt64> last_max_thds_metric_reset_ts{0};
const UInt64 max_thds_metric_reset_interval = 60; //60s

namespace DB
{
bool tryToResetMaxThreadsMetrics()
{
UInt64 now_ts = StopWatchDetail::seconds(CLOCK_MONOTONIC);
if (now_ts > last_max_thds_metric_reset_ts + max_thds_metric_reset_interval)
{
last_max_thds_metric_reset_ts = now_ts;
GET_METRIC(tiflash_thread_count, type_max_threads_of_dispatch_mpp).Set(GET_METRIC(tiflash_thread_count, type_active_threads_of_dispatch_mpp).Value());
GET_METRIC(tiflash_thread_count, type_max_threads_of_establish_mpp).Set(GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Value());
GET_METRIC(tiflash_thread_count, type_max_threads_of_raw).Set(GET_METRIC(tiflash_thread_count, type_total_threads_of_raw).Value());
GET_METRIC(tiflash_thread_count, type_max_active_threads_of_thdpool).Set(GET_METRIC(tiflash_thread_count, type_active_threads_of_thdpool).Value());
GET_METRIC(tiflash_thread_count, type_max_threads_of_thdpool).Set(GET_METRIC(tiflash_thread_count, type_total_threads_of_thdpool).Value());
return true;
}
return false;
}
} // namespace DB
6 changes: 6 additions & 0 deletions dbms/src/Common/ThreadMetricUtil.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#pragma once

namespace DB
{
bool tryToResetMaxThreadsMetrics();
}
21 changes: 20 additions & 1 deletion dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
#include <prometheus/histogram.h>
#include <prometheus/registry.h>

#include <ext/scope_guard.h>

// to make GCC 11 happy
#include <cassert>

Expand Down Expand Up @@ -151,7 +153,18 @@ namespace DB
M(tiflash_raft_write_data_to_storage_duration_seconds, "Bucketed histogram of writting region into storage layer", Histogram, \
F(type_decode, {{"type", "decode"}}, ExpBuckets{0.0005, 2, 20}), F(type_write, {{"type", "write"}}, ExpBuckets{0.0005, 2, 20})) \
M(tiflash_server_info, "Indicate the tiflash server info, and the value is the start timestamp (s).", Gauge, \
F(start_time, {"version", TiFlashBuildInfo::getReleaseVersion()}, {"hash", TiFlashBuildInfo::getGitHash()}))
F(start_time, {"version", TiFlashBuildInfo::getReleaseVersion()}, {"hash", TiFlashBuildInfo::getGitHash()})) \
M(tiflash_thread_count, "Number of threads", Gauge, \
F(type_max_threads_of_thdpool, {"type", "thread_pool_total_max"}), \
F(type_active_threads_of_thdpool, {"type", "thread_pool_active"}), \
F(type_max_active_threads_of_thdpool, {"type", "thread_pool_active_max"}), \
F(type_total_threads_of_thdpool, {"type", "thread_pool_total"}), \
F(type_max_threads_of_raw, {"type", "total_max"}), \
F(type_total_threads_of_raw, {"type", "total"}), \
F(type_max_threads_of_establish_mpp, {"type", "rpc_establish_mpp_max"}), \
F(type_active_threads_of_establish_mpp, {"type", "rpc_establish_mpp"}), \
F(type_max_threads_of_dispatch_mpp, {"type", "rpc_dispatch_mpp_max"}), \
F(type_active_threads_of_dispatch_mpp, {"type", "rpc_dispatch_mpp"}))
// clang-format on

struct ExpBuckets
Expand Down Expand Up @@ -318,4 +331,10 @@ APPLY_FOR_METRICS(MAKE_METRIC_ENUM_M, MAKE_METRIC_ENUM_F)
__GET_METRIC_MACRO(__VA_ARGS__, __GET_METRIC_1, __GET_METRIC_0) \
(__VA_ARGS__)

#define UPDATE_CUR_AND_MAX_METRIC(family, metric, metric_max) \
GET_METRIC(family, metric).Increment(); \
GET_METRIC(family, metric_max).Set(std::max(GET_METRIC(family, metric_max).Value(), GET_METRIC(family, metric).Value())); \
SCOPE_EXIT({ \
GET_METRIC(family, metric).Decrement(); \
})
} // namespace DB
20 changes: 20 additions & 0 deletions dbms/src/Flash/FlashService.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#include <Common/CPUAffinityManager.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadMetricUtil.h>
#include <Common/TiFlashMetrics.h>
#include <Common/setThreadName.h>
#include <Core/Types.h>
Expand Down Expand Up @@ -140,8 +141,18 @@ ::grpc::Status FlashService::DispatchMPPTask(
}
GET_METRIC(tiflash_coprocessor_request_count, type_dispatch_mpp_task).Increment();
GET_METRIC(tiflash_coprocessor_handling_request_count, type_dispatch_mpp_task).Increment();
GET_METRIC(tiflash_thread_count, type_active_threads_of_dispatch_mpp).Increment();
GET_METRIC(tiflash_thread_count, type_total_threads_of_raw).Increment();
if (!tryToResetMaxThreadsMetrics())
{
GET_METRIC(tiflash_thread_count, type_max_threads_of_dispatch_mpp).Set(std::max(GET_METRIC(tiflash_thread_count, type_max_threads_of_dispatch_mpp).Value(), GET_METRIC(tiflash_thread_count, type_active_threads_of_dispatch_mpp).Value()));
GET_METRIC(tiflash_thread_count, type_max_threads_of_raw).Set(std::max(GET_METRIC(tiflash_thread_count, type_max_threads_of_raw).Value(), GET_METRIC(tiflash_thread_count, type_total_threads_of_raw).Value()));
}

Stopwatch watch;
SCOPE_EXIT({
GET_METRIC(tiflash_thread_count, type_total_threads_of_raw).Decrement();
GET_METRIC(tiflash_thread_count, type_active_threads_of_dispatch_mpp).Decrement();
GET_METRIC(tiflash_coprocessor_handling_request_count, type_dispatch_mpp_task).Decrement();
GET_METRIC(tiflash_coprocessor_request_duration_seconds, type_dispatch_mpp_task).Observe(watch.elapsedSeconds());
GET_METRIC(tiflash_coprocessor_response_bytes).Increment(response->ByteSizeLong());
Expand Down Expand Up @@ -193,8 +204,17 @@ ::grpc::Status FlashService::EstablishMPPConnection(::grpc::ServerContext * grpc
}
GET_METRIC(tiflash_coprocessor_request_count, type_mpp_establish_conn).Increment();
GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Increment();
GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Increment();
GET_METRIC(tiflash_thread_count, type_total_threads_of_raw).Increment();
if (!tryToResetMaxThreadsMetrics())
{
GET_METRIC(tiflash_thread_count, type_max_threads_of_establish_mpp).Set(std::max(GET_METRIC(tiflash_thread_count, type_max_threads_of_establish_mpp).Value(), GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Value()));
GET_METRIC(tiflash_thread_count, type_max_threads_of_raw).Set(std::max(GET_METRIC(tiflash_thread_count, type_max_threads_of_raw).Value(), GET_METRIC(tiflash_thread_count, type_total_threads_of_raw).Value()));
}
Stopwatch watch;
SCOPE_EXIT({
GET_METRIC(tiflash_thread_count, type_total_threads_of_raw).Decrement();
GET_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp).Decrement();
GET_METRIC(tiflash_coprocessor_handling_request_count, type_mpp_establish_conn).Decrement();
GET_METRIC(tiflash_coprocessor_request_duration_seconds, type_mpp_establish_conn).Observe(watch.elapsedSeconds());
// TODO: update the value of metric tiflash_coprocessor_response_bytes.
Expand Down
2 changes: 2 additions & 0 deletions dbms/src/Flash/Mpp/MPPTunnel.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <Common/Exception.h>
#include <Common/FailPoint.h>
#include <Common/ThreadFactory.h>
#include <Common/TiFlashMetrics.h>
#include <Flash/Mpp/MPPTunnel.h>
#include <Flash/Mpp/Utils.h>
#include <Flash/Mpp/getMPPTaskLog.h>
Expand Down Expand Up @@ -119,6 +120,7 @@ template <typename Writer>
void MPPTunnelBase<Writer>::sendLoop()
{
assert(!is_local);
UPDATE_CUR_AND_MAX_METRIC(tiflash_thread_count, type_active_threads_of_establish_mpp, type_max_threads_of_establish_mpp);
String err_msg;
try
{
Expand Down
Loading

0 comments on commit 05f5628

Please sign in to comment.