diff --git a/dbms/src/Common/MemoryTracker.cpp b/dbms/src/Common/MemoryTracker.cpp index a70a8017cac..e4218edfe74 100644 --- a/dbms/src/Common/MemoryTracker.cpp +++ b/dbms/src/Common/MemoryTracker.cpp @@ -77,8 +77,8 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) if (check_memory_limit) { Int64 current_limit = limit.load(std::memory_order_relaxed); - - if (unlikely(!next.load(std::memory_order_relaxed) && accuracy_diff_for_test && current_limit && real_rss > accuracy_diff_for_test + current_limit)) + Int64 current_accuracy_diff_for_test = accuracy_diff_for_test.load(std::memory_order_relaxed); + if (unlikely(!next.load(std::memory_order_relaxed) && current_accuracy_diff_for_test && current_limit && real_rss > current_accuracy_diff_for_test + current_limit)) { DB::FmtBuffer fmt_buf; fmt_buf.append("Memory tracker accuracy "); @@ -108,9 +108,10 @@ void MemoryTracker::alloc(Int64 size, bool check_memory_limit) throw DB::TiFlashException(fmt_buf.toString(), DB::Errors::Coprocessor::MemoryLimitExceeded); } + Int64 current_bytes_rss_larger_than_limit = bytes_rss_larger_than_limit.load(std::memory_order_relaxed); bool is_rss_too_large = (!next.load(std::memory_order_relaxed) && current_limit - && real_rss > current_limit + bytes_rss_larger_than_limit - && will_be > current_limit - (real_rss - current_limit - bytes_rss_larger_than_limit)); + && real_rss > current_limit + current_bytes_rss_larger_than_limit + && will_be > current_limit - (real_rss - current_limit - current_bytes_rss_larger_than_limit)); if (is_rss_too_large || unlikely(current_limit && will_be > current_limit)) { diff --git a/dbms/src/Common/MemoryTracker.h b/dbms/src/Common/MemoryTracker.h index 453b5c4814d..5248303adec 100644 --- a/dbms/src/Common/MemoryTracker.h +++ b/dbms/src/Common/MemoryTracker.h @@ -39,13 +39,13 @@ class MemoryTracker : public std::enable_shared_from_this std::atomic limit{0}; // How many bytes RSS(Resident Set Size) can be larger than limit(max_memory_usage_for_all_queries). Default: 5GB - Int64 bytes_rss_larger_than_limit = 5368709120; + std::atomic bytes_rss_larger_than_limit{1073741824}; /// To test exception safety of calling code, memory tracker throws an exception on each memory allocation with specified probability. double fault_probability = 0; /// To test the accuracy of memory track, it throws an exception when the part exceeding the tracked amount is greater than accuracy_diff_for_test. - Int64 accuracy_diff_for_test = 0; + std::atomic accuracy_diff_for_test{0}; /// Singly-linked list. All information will be passed to subsequent memory trackers also (it allows to implement trackers hierarchy). /// In terms of tree nodes it is the list of parents. Lifetime of these trackers should "include" lifetime of current tracker. @@ -103,11 +103,11 @@ class MemoryTracker : public std::enable_shared_from_this */ void setOrRaiseLimit(Int64 value); - void setBytesThatRssLargerThanLimit(Int64 value) { bytes_rss_larger_than_limit = value; } + void setBytesThatRssLargerThanLimit(Int64 value) { bytes_rss_larger_than_limit.store(value, std::memory_order_relaxed); } void setFaultProbability(double value) { fault_probability = value; } - void setAccuracyDiffForTest(double value) { accuracy_diff_for_test = value; } + void setAccuracyDiffForTest(Int64 value) { accuracy_diff_for_test.store(value, std::memory_order_relaxed); } /// next should be changed only once: from nullptr to some value. void setNext(MemoryTracker * elem) diff --git a/dbms/src/Flash/Mpp/MPPTunnel.cpp b/dbms/src/Flash/Mpp/MPPTunnel.cpp index fca29852a16..2d872d2e58b 100644 --- a/dbms/src/Flash/Mpp/MPPTunnel.cpp +++ b/dbms/src/Flash/Mpp/MPPTunnel.cpp @@ -113,8 +113,7 @@ void MPPTunnel::finishSendQueue(bool drain) void MPPTunnel::close(const String & reason) { SCOPE_EXIT({ - // ensure the tracked memory is released and updated before memory tracker(in ProcListEntry) is released - finishSendQueue(true); // drain the send_queue when close + finishSendQueue(true); // drain the send_queue when close, to release useless memory }); { std::unique_lock lk(mu);