Skip to content

Commit

Permalink
add track info for mem tracker fault inject (#6156)
Browse files Browse the repository at this point in the history
close #6158
  • Loading branch information
bestwoody authored Oct 20, 2022
1 parent 71d5014 commit ce64c1c
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 12 deletions.
15 changes: 9 additions & 6 deletions dbms/src/Common/BackgroundTask.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
#include <fstream>
namespace DB
{
bool process_mem_usage(double & resident_set)
bool process_mem_usage(double & resident_set, Int64 & cur_proc_num_threads, UInt64 & cur_virt_size)
{
resident_set = 0.0;

Expand All @@ -31,16 +31,15 @@ bool process_mem_usage(double & resident_set)
std::string pid, comm, state, ppid, pgrp, session, tty_nr;
std::string tpgid, flags, minflt, cminflt, majflt, cmajflt;
std::string utime, stime, cutime, cstime, priority, nice;
std::string proc_num_threads, itrealvalue, starttime;
UInt64 vsize;
std::string itrealvalue, starttime;

// the field we want
Int64 rss;

stat_stream >> pid >> comm >> state >> ppid >> pgrp >> session >> tty_nr
>> tpgid >> flags >> minflt >> cminflt >> majflt >> cmajflt
>> utime >> stime >> cutime >> cstime >> priority >> nice
>> proc_num_threads >> itrealvalue >> starttime >> vsize >> rss; // don't care about the rest
>> cur_proc_num_threads >> itrealvalue >> starttime >> cur_virt_size >> rss; // don't care about the rest

stat_stream.close();

Expand Down Expand Up @@ -74,12 +73,16 @@ void CollectProcInfoBackgroundTask::begin()
void CollectProcInfoBackgroundTask::memCheckJob()
{
double resident_set;
Int64 cur_proc_num_threads = 1;
UInt64 cur_virt_size = 0;
while (!end_syn)
{
process_mem_usage(resident_set);
process_mem_usage(resident_set, cur_proc_num_threads, cur_virt_size);
resident_set *= 1024; // unit: byte
real_rss = static_cast<Int64>(resident_set);

proc_num_threads = cur_proc_num_threads;
proc_virt_size = cur_virt_size;
baseline_of_query_mem_tracker = root_of_query_mem_trackers->get();
usleep(100000); // sleep 100ms
}
end_fin = true;
Expand Down
23 changes: 19 additions & 4 deletions dbms/src/Common/MemoryTracker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,15 @@

#include <iomanip>

std::atomic<Int64> real_rss{0};
std::atomic<Int64> real_rss{0}, proc_num_threads{1}, baseline_of_query_mem_tracker{0};
std::atomic<UInt64> proc_virt_size{0};
MemoryTracker::~MemoryTracker()
{
// Destruction of global root mem tracker means the process is shutting down, log and metrics models may have been released!
// So we just skip operations of log or metrics for global root mem trackers.
if (is_global_root)
return;

if (peak)
{
try
Expand Down Expand Up @@ -85,9 +91,15 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
if (description)
fmt_buf.fmtAppend(" {}", description);

fmt_buf.fmtAppend(": fault injected. real_rss ({}) is much larger than limit ({})",
fmt_buf.fmtAppend(": fault injected. real_rss ({}) is much larger than limit ({}). Debug info, threads of process: {}, memory usage tracked by ProcessList: peak {}, current {}, memory usage not tracked by ProcessList: peak {}, current {} . Virtual memory size: {}",
formatReadableSizeWithBinarySuffix(real_rss),
formatReadableSizeWithBinarySuffix(current_limit));
formatReadableSizeWithBinarySuffix(current_limit),
proc_num_threads.load(),
(root_of_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_query_mem_trackers->peak) : "0"),
(root_of_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_query_mem_trackers->amount) : "0"),
(root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->peak) : "0"),
(root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->amount) : "0"),
proc_virt_size.load());
throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded);
}

Expand All @@ -111,7 +123,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit)
Int64 current_bytes_rss_larger_than_limit = bytes_rss_larger_than_limit.load(std::memory_order_relaxed);
bool is_rss_too_large = (!next.load(std::memory_order_relaxed) && current_limit
&& real_rss > current_limit + current_bytes_rss_larger_than_limit
&& will_be > current_limit - (real_rss - current_limit - current_bytes_rss_larger_than_limit));
&& will_be > baseline_of_query_mem_tracker);
if (is_rss_too_large
|| unlikely(current_limit && will_be > current_limit))
{
Expand Down Expand Up @@ -207,6 +219,9 @@ __thread MemoryTracker * current_memory_tracker = nullptr;
thread_local MemoryTracker * current_memory_tracker = nullptr;
#endif

std::shared_ptr<MemoryTracker> root_of_non_query_mem_trackers = MemoryTracker::createGlobalRoot();
std::shared_ptr<MemoryTracker> root_of_query_mem_trackers = MemoryTracker::createGlobalRoot();

namespace CurrentMemoryTracker
{
static Int64 MEMORY_TRACER_SUBMIT_THRESHOLD = 1024 * 1024; // 1 MiB
Expand Down
18 changes: 17 additions & 1 deletion dbms/src/Common/MemoryTracker.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@

#include <atomic>

extern std::atomic<Int64> real_rss;
extern std::atomic<Int64> real_rss, proc_num_threads, baseline_of_query_mem_tracker;
extern std::atomic<UInt64> proc_virt_size;
namespace CurrentMetrics
{
extern const Metric MemoryTracking;
Expand All @@ -44,6 +45,8 @@ class MemoryTracker : public std::enable_shared_from_this<MemoryTracker>
/// To test exception safety of calling code, memory tracker throws an exception on each memory allocation with specified probability.
double fault_probability = 0;

bool is_global_root = false;

/// To test the accuracy of memory track, it throws an exception when the part exceeding the tracked amount is greater than accuracy_diff_for_test.
std::atomic<Int64> accuracy_diff_for_test{0};

Expand All @@ -64,6 +67,11 @@ class MemoryTracker : public std::enable_shared_from_this<MemoryTracker>
: limit(limit_)
{}

explicit MemoryTracker(Int64 limit_, bool is_global_root)
: limit(limit_)
, is_global_root(is_global_root)
{}

public:
/// Using `std::shared_ptr` and `new` instread of `std::make_shared` is because `std::make_shared` cannot call private constructors.
static MemoryTrackerPtr create(Int64 limit = 0)
Expand All @@ -78,6 +86,11 @@ class MemoryTracker : public std::enable_shared_from_this<MemoryTracker>
}
}

static MemoryTrackerPtr createGlobalRoot()
{
return std::shared_ptr<MemoryTracker>(new MemoryTracker(0, true));
}

~MemoryTracker();

/** Call the following functions before calling of corresponding operations with memory allocators.
Expand Down Expand Up @@ -141,6 +154,9 @@ extern __thread MemoryTracker * current_memory_tracker;
extern thread_local MemoryTracker * current_memory_tracker;
#endif

extern std::shared_ptr<MemoryTracker> root_of_non_query_mem_trackers;
extern std::shared_ptr<MemoryTracker> root_of_query_mem_trackers;

/// Convenience methods, that use current_memory_tracker if it is available.
namespace CurrentMemoryTracker
{
Expand Down
2 changes: 1 addition & 1 deletion dbms/src/Interpreters/ProcessList.h
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ class ProcessList
ProcessList(size_t max_size_ = 0)
: cur_size(0)
, max_size(max_size_)
, total_memory_tracker(MemoryTracker::create())
, total_memory_tracker(root_of_query_mem_trackers)
{}

using EntryPtr = std::shared_ptr<ProcessListEntry>;
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Storages/BackgroundProcessingPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ void BackgroundProcessingPool::threadFunction(size_t thread_idx)
}

auto memory_tracker = MemoryTracker::create();
memory_tracker->setNext(root_of_non_query_mem_trackers.get());
memory_tracker->setMetric(CurrentMetrics::MemoryTrackingInBackgroundProcessingPool);
current_memory_tracker = memory_tracker.get();

Expand Down

0 comments on commit ce64c1c

Please sign in to comment.