diff --git a/dbms/src/Common/BackgroundTask.cpp b/dbms/src/Common/BackgroundTask.cpp index 16a23535541..4d9600d3e13 100644 --- a/dbms/src/Common/BackgroundTask.cpp +++ b/dbms/src/Common/BackgroundTask.cpp @@ -17,7 +17,7 @@ #include namespace DB { -bool process_mem_usage(double & resident_set) +bool process_mem_usage(double & resident_set, Int64 & cur_proc_num_threads, UInt64 & cur_virt_size) { resident_set = 0.0; @@ -31,8 +31,7 @@ bool process_mem_usage(double & resident_set) std::string pid, comm, state, ppid, pgrp, session, tty_nr; std::string tpgid, flags, minflt, cminflt, majflt, cmajflt; std::string utime, stime, cutime, cstime, priority, nice; - std::string proc_num_threads, itrealvalue, starttime; - UInt64 vsize; + std::string itrealvalue, starttime; // the field we want Int64 rss; @@ -40,7 +39,7 @@ bool process_mem_usage(double & resident_set) stat_stream >> pid >> comm >> state >> ppid >> pgrp >> session >> tty_nr >> tpgid >> flags >> minflt >> cminflt >> majflt >> cmajflt >> utime >> stime >> cutime >> cstime >> priority >> nice - >> proc_num_threads >> itrealvalue >> starttime >> vsize >> rss; // don't care about the rest + >> cur_proc_num_threads >> itrealvalue >> starttime >> cur_virt_size >> rss; // don't care about the rest stat_stream.close(); @@ -74,12 +73,16 @@ void CollectProcInfoBackgroundTask::begin() void CollectProcInfoBackgroundTask::memCheckJob() { double resident_set; + Int64 cur_proc_num_threads = 1; + UInt64 cur_virt_size = 0; while (!end_syn) { - process_mem_usage(resident_set); + process_mem_usage(resident_set, cur_proc_num_threads, cur_virt_size); resident_set *= 1024; // unit: byte real_rss = static_cast(resident_set); - + proc_num_threads = cur_proc_num_threads; + proc_virt_size = cur_virt_size; + baseline_of_query_mem_tracker = root_of_query_mem_trackers->get(); usleep(100000); // sleep 100ms } end_fin = true; diff --git a/dbms/src/Common/MemoryTracker.cpp b/dbms/src/Common/MemoryTracker.cpp index cece5b4824f..f813d3188b3 100644 --- a/dbms/src/Common/MemoryTracker.cpp +++ b/dbms/src/Common/MemoryTracker.cpp @@ -22,9 +22,15 @@ #include -std::atomic real_rss{0}; +std::atomic real_rss{0}, proc_num_threads{1}, baseline_of_query_mem_tracker{0}; +std::atomic proc_virt_size{0}; MemoryTracker::~MemoryTracker() { + // Destruction of global root mem tracker means the process is shutting down, log and metrics models may have been released! + // So we just skip operations of log or metrics for global root mem trackers. + if (is_global_root) + return; + if (peak) { try @@ -85,9 +91,15 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) if (description) fmt_buf.fmtAppend(" {}", description); - fmt_buf.fmtAppend(": fault injected. real_rss ({}) is much larger than limit ({})", + fmt_buf.fmtAppend(": fault injected. real_rss ({}) is much larger than limit ({}). Debug info, threads of process: {}, memory usage tracked by ProcessList: peak {}, current {}, memory usage not tracked by ProcessList: peak {}, current {} . Virtual memory size: {}", formatReadableSizeWithBinarySuffix(real_rss), - formatReadableSizeWithBinarySuffix(current_limit)); + formatReadableSizeWithBinarySuffix(current_limit), + proc_num_threads.load(), + (root_of_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_query_mem_trackers->peak) : "0"), + (root_of_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_query_mem_trackers->amount) : "0"), + (root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->peak) : "0"), + (root_of_non_query_mem_trackers ? formatReadableSizeWithBinarySuffix(root_of_non_query_mem_trackers->amount) : "0"), + proc_virt_size.load()); throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded); } @@ -111,7 +123,7 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) Int64 current_bytes_rss_larger_than_limit = bytes_rss_larger_than_limit.load(std::memory_order_relaxed); bool is_rss_too_large = (!next.load(std::memory_order_relaxed) && current_limit && real_rss > current_limit + current_bytes_rss_larger_than_limit - && will_be > current_limit - (real_rss - current_limit - current_bytes_rss_larger_than_limit)); + && will_be > baseline_of_query_mem_tracker); if (is_rss_too_large || unlikely(current_limit && will_be > current_limit)) { @@ -207,6 +219,9 @@ __thread MemoryTracker * current_memory_tracker = nullptr; thread_local MemoryTracker * current_memory_tracker = nullptr; #endif +std::shared_ptr root_of_non_query_mem_trackers = MemoryTracker::createGlobalRoot(); +std::shared_ptr root_of_query_mem_trackers = MemoryTracker::createGlobalRoot(); + namespace CurrentMemoryTracker { static Int64 MEMORY_TRACER_SUBMIT_THRESHOLD = 1024 * 1024; // 1 MiB diff --git a/dbms/src/Common/MemoryTracker.h b/dbms/src/Common/MemoryTracker.h index 07d747c9a64..3853324c2cb 100644 --- a/dbms/src/Common/MemoryTracker.h +++ b/dbms/src/Common/MemoryTracker.h @@ -19,7 +19,8 @@ #include -extern std::atomic real_rss; +extern std::atomic real_rss, proc_num_threads, baseline_of_query_mem_tracker; +extern std::atomic proc_virt_size; namespace CurrentMetrics { extern const Metric MemoryTracking; @@ -44,6 +45,8 @@ class MemoryTracker : public std::enable_shared_from_this /// To test exception safety of calling code, memory tracker throws an exception on each memory allocation with specified probability. double fault_probability = 0; + bool is_global_root = false; + /// To test the accuracy of memory track, it throws an exception when the part exceeding the tracked amount is greater than accuracy_diff_for_test. std::atomic accuracy_diff_for_test{0}; @@ -64,6 +67,11 @@ class MemoryTracker : public std::enable_shared_from_this : limit(limit_) {} + explicit MemoryTracker(Int64 limit_, bool is_global_root) + : limit(limit_) + , is_global_root(is_global_root) + {} + public: /// Using `std::shared_ptr` and `new` instread of `std::make_shared` is because `std::make_shared` cannot call private constructors. static MemoryTrackerPtr create(Int64 limit = 0) @@ -78,6 +86,11 @@ class MemoryTracker : public std::enable_shared_from_this } } + static MemoryTrackerPtr createGlobalRoot() + { + return std::shared_ptr(new MemoryTracker(0, true)); + } + ~MemoryTracker(); /** Call the following functions before calling of corresponding operations with memory allocators. @@ -141,6 +154,9 @@ extern __thread MemoryTracker * current_memory_tracker; extern thread_local MemoryTracker * current_memory_tracker; #endif +extern std::shared_ptr root_of_non_query_mem_trackers; +extern std::shared_ptr root_of_query_mem_trackers; + /// Convenience methods, that use current_memory_tracker if it is available. namespace CurrentMemoryTracker { diff --git a/dbms/src/Interpreters/ProcessList.h b/dbms/src/Interpreters/ProcessList.h index a88c42db055..1484bbe628e 100644 --- a/dbms/src/Interpreters/ProcessList.h +++ b/dbms/src/Interpreters/ProcessList.h @@ -299,7 +299,7 @@ class ProcessList ProcessList(size_t max_size_ = 0) : cur_size(0) , max_size(max_size_) - , total_memory_tracker(MemoryTracker::create()) + , total_memory_tracker(root_of_query_mem_trackers) {} using EntryPtr = std::shared_ptr; diff --git a/dbms/src/Storages/BackgroundProcessingPool.cpp b/dbms/src/Storages/BackgroundProcessingPool.cpp index 8d914aac2a5..b7ead56e5a9 100644 --- a/dbms/src/Storages/BackgroundProcessingPool.cpp +++ b/dbms/src/Storages/BackgroundProcessingPool.cpp @@ -155,6 +155,7 @@ void BackgroundProcessingPool::threadFunction(size_t thread_idx) } auto memory_tracker = MemoryTracker::create(); + memory_tracker->setNext(root_of_non_query_mem_trackers.get()); memory_tracker->setMetric(CurrentMetrics::MemoryTrackingInBackgroundProcessingPool); current_memory_tracker = memory_tracker.get();