-
Notifications
You must be signed in to change notification settings - Fork 5.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Heap Memory Based Worker Flag to stop processing new split when in low memory #20946
Heap Memory Based Worker Flag to stop processing new split when in low memory #20946
Conversation
d545b6a
to
d67cf20
Compare
@@ -254,6 +271,8 @@ public TaskExecutor( | |||
Duration interruptSplitInterval, | |||
EmbedVersion embedVersion, | |||
MultilevelSplitQueue splitQueue, | |||
boolean memoryBasedSlowDownEnabled, | |||
double memoryBasedSlowDownThreshold, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we have just one config and somehow infer enabled/disabled from that one ?
@@ -530,6 +551,13 @@ private synchronized void startSplit(PrioritizedSplitRunner split) | |||
|
|||
private synchronized PrioritizedSplitRunner pollNextSplitWorker() | |||
{ | |||
if (memoryBasedSlowDownEnabled) { | |||
MemoryUsage memoryUsage = MEMORY_MX_BEAN.getHeapMemoryUsage(); | |||
if (memoryUsage.getUsed() > memoryUsage.getCommitted() * memoryBasedSlowDownThreshold) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you jmh microbenchmark the cost of getting this bean calls -- I am concerned about introducing this in the hot path for each split.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this is where you should plug in: This will simply allow running the existing splits in the waitingSplits -- all this is does is not release any more "scan/leaf" splits to mlsq -- it does nothing about intermediate splits eg.
Did you consider teaching the MLSQ about this policy instead ? It will naturally stall the split runner threads.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the point in time checks might not be representative of the overall health. If I am not wrong the memory allocation will follow a sawtooth pattern.
Relevant article from Elasticsearch guys where they rely on the fill rate of old gen to detect memory pressure.
https://www.elastic.co/blog/found-understanding-memory-pressure-indicator
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The memory usage is pulled out into a seperate thread so it won't impact the task/split processing.
Interesting work! Could we create an issue describing the design? I am curious how slowing down split processing will interact with memory management and how it will avoid deadlock. |
d67cf20
to
067f149
Compare
Added details in the description with more details. |
|
I am trying to understand the idea and don't know if it will cause deadlock. But deadlock crossed my mind because it seems this is taking a different route to tackle the same problem as the memory management framework: https://prestodb.io/blog/2019/08/19/memory-tracking However, that framework also solves the larger problem: when one pool is blocked, to avoid the cluster from deadlocking and not making progress overall, it can kill queries (or promote them to the reserved pool). If my idea about this change is correct, it seems this solves the problem locally but not the global problem. And if that's the case, suppose this framework kicks in: couldn't it cause either a slowdown or a lockup on one worker, which could cause a cluster-wide slowdown as well (since the cluster would be bottlenecked by the one single worker)? Let me know if I'm missing some context or not understanding correctly. |
As provided details in the description: we are noticing our low memory workers are crashing due to OOM. And during the time the workers were having higher number of splits (running + waiting). Given the workers are already using lot more memory, we want to prevent them from doing lot more work. And to achieve that we are introducing this low memory flag, that prevents worker from running splits if the previous one finished or a new one being submitted as part of the task update requests.
For memory framework, true that it handles the memory tracking. But we at Meta noticed the accounted memory in our MemoryPool is way lower than the actual heap usage. And that means we can't rely on the memory framework to kick in and kill queries given the gap between how much memory is free as per Heap vs how much memory is free as per memory tracking framework is far off.
The change is not trying to solve overall cluster level memory issue but a local issue where worker is running hot resulting it being running out of memory. To prevent that, we want the worker to do less work for the time being till the memory frees up and pick up the work again. Sure this will slow down but at the cost of reliability. Without slowing down the cluster, worker can crash and resulting in lot more query failures which we want to prevent in this scenario. Also the feature is config driven so can be tuned as per the need and can be disabled as well. Hope this explains the rational behind this PR. |
@tdcmeehan We are looking for a stop-bleeding solution to solve the hot spot problem, mainly due to the split brain issue from the disaggregated coordinator. We already have many OKR metric violation this month (you know the UER :p). From the top of my head if I have to construct a case for the deadlock. It is when a query trying to do a hash join, and the build side accumulate very large data and make the hash table exceeds the threshold. I think that might lead to a dead lock. The hash table wouldn’t be released since the task is unfinished. and the hash join cannot proceed since no more leaf splits would get executed. However, when we set the local memory limit to be significantly lower than the threshold, the query should be killed before the threshold is reached. But this is from my speculation. @swapsmagic If what I speculated holds true, I guess it would be great to add a foolproof measure when loading the config. For example: when the threshold is below the local memory limit then warns the users the potential consequence or suggest a correct config combination? Otherwise the PR looks good to me in general. For sure we need to address Tim's concern. |
067f149
to
cbe2cfd
Compare
Added a min value check so threshold can't be set too low and avoid having slow down or deadlock situation. |
presto-main/src/main/java/com/facebook/presto/util/LowMemoryMonitor.java
Outdated
Show resolved
Hide resolved
40e9c9c
to
e03f31d
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks good to me.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I want to confirm the semantics -- maybe you should call that out clearly in the PR description.
This PR does not pause running splits when there is memory pressure. It just doesn't take in any new leaf splits when that happens. Right ? So we should expect to see the waitingSplits eventually run dry, right ? The PR description says "slow down", but it is more like "pause taking in new work". ie its very binary.
i don't see any tests for this. Also, I think we should describe what improvements we saw with this in the PR test plan. Like how you created a memory pressure scenario and how did this PR help with that. Maybe you can show a graph of RunningSplits / WaitingSplits or describe the same in words. As a control plane change, it would be great to validate its effectiveness and responsiveness.
presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java
Outdated
Show resolved
Hide resolved
//Worker skip processing split if jvm heap usage crosses configured threhold | ||
//Helps reduce memory pressure on the worker and avoid OOMs | ||
if (isLowMemory()) { | ||
log.debug("Skip task scheduling due to low memory"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to add this logging in the isLowMemory method so that relevant extra stats can be logged as well there, providing more insight into that decision.
Also maybe the counter update there as well to DRY out this code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both places where we are checking low memory, put a different logging to help understand which flow is being skipped. This will help in debugging issues in the future and that's why it's a debug
log and not info
log.
presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/execution/executor/TaskExecutor.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/util/LowMemoryMonitor.java
Outdated
Show resolved
Hide resolved
e03f31d
to
a9aee78
Compare
@swapsmagic Can you please add some jmx metrics for the testing you did? |
presto-main/src/main/java/com/facebook/presto/execution/TaskManagerConfig.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/util/LowMemoryMonitor.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/util/LowMemoryMonitor.java
Outdated
Show resolved
Hide resolved
01cc3ff
to
11628f0
Compare
long memoryThreshold = (long) (maxMemory * threshold); | ||
|
||
if (usedMemory > memoryThreshold) { | ||
if (!taskExecutor.isLowMemory()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It maybe simpler to have the TaskExecutor do its own state keeping for low memory mode. It can ensure the idempotency you are shooting for in addition to the logging. So the call can become simpler here in terms of taskExecutor.setLowMemory(usedMemory, maxMemory, memoryThresh).
It should be a low overhead call since anyway it is done once every K milliseconds
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding some comments.
|
||
public void checkLowMemory() | ||
{ | ||
MemoryMXBean mbean = ManagementFactory.getMemoryMXBean(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can initialize this once in the constructor.
private final MemoryMXBean memoryMXBean;
...
this.memoryMXBean = ManagementFactory.getMemoryMXBean();
|
||
if (usedMemory > memoryThreshold) { | ||
if (!taskExecutor.isLowMemory()) { | ||
log.debug("Enabling Low Memory: Used: " + usedMemory + " Max: " + maxMemory + " Threshold: " + memoryThreshold); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit use parameterized logging.
} | ||
else { | ||
if (taskExecutor.isLowMemory()) { | ||
log.debug("Enabling Low Memory: Used: " + usedMemory + " Max: " + maxMemory + " Threshold: " + memoryThreshold); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit use parameterized logging. Also shouldn't the message be Disabling
instead of Enabling
presto-main/src/main/java/com/facebook/presto/util/LowMemoryMonitor.java
Outdated
Show resolved
Hide resolved
presto-main/src/main/java/com/facebook/presto/util/LowMemoryMonitor.java
Outdated
Show resolved
Hide resolved
|
||
private final TimeStat blockedQuantaWallTime = new TimeStat(MICROSECONDS); | ||
private final TimeStat unblockedQuantaWallTime = new TimeStat(MICROSECONDS); | ||
|
||
private volatile boolean closed; | ||
|
||
private AtomicBoolean lowMemory = new AtomicBoolean(false); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made the variable volatile boolean
.
if (!taskExecutor.isLowMemory()) { | ||
log.debug("Enabling Low Memory: Used: " + usedMemory + " Max: " + maxMemory + " Threshold: " + memoryThreshold); | ||
taskExecutor.setLowMemory(true); | ||
} | ||
} | ||
else { | ||
if (taskExecutor.isLowMemory()) { | ||
log.debug("Enabling Low Memory: Used: " + usedMemory + " Max: " + maxMemory + " Threshold: " + memoryThreshold); | ||
taskExecutor.setLowMemory(false); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
check- then-set part is interesting. I think even with atomic booleans, the value can change from the time you check and then set. I'm not sure that will create subtle issues in this code. Please give it some thought.
Prefer using CAS operations to get rid of check- then-set.
if (usedMemory > memoryThreshold) {
if (taskExecutor.getLowMemory().compareAndSet(false, true)) {
log.debug("Enabling Low Memory: Used: " + usedMemory + " Max: " + maxMemory + " Threshold: " + memoryThreshold);
}
}
else {
if (taskExecutor.getLowMemory().compareAndSet(true, false)) {
log.debug("Disabling Low Memory: Used: " + usedMemory + " Max: " + maxMemory + " Threshold: " + memoryThreshold);
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Given single thread is doing the check and update, this should not be the issue.
a7ada32
to
2f4c54a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me. Lets also mention about the follow up change in the memory framework we are planning to do in the description.
taskExecutor.setLowMemory(true); | ||
} | ||
} | ||
else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: else and if can be merged.
|
||
private final TimeStat blockedQuantaWallTime = new TimeStat(MICROSECONDS); | ||
private final TimeStat unblockedQuantaWallTime = new TimeStat(MICROSECONDS); | ||
|
||
private volatile boolean closed; | ||
|
||
private volatile boolean lowMemory; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a comment saying it should only be updated from 1 thread from the lowMemoryExecutor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Please address the checkstyle failures as well. |
Introducing heap memory usage based slow down of worker. When worker memory utilization increases to certain threshold, it will stop processing new splits till the memory usage goes below threshold. This will help avoid scenario when worker heap usage is too high but has more work to do and resulting in out of memory error.
2f4c54a
to
ff3cbb8
Compare
As I mentioned, local slowdown of splits may cause distributed issues. Please, can we follow up this PR with two things:
|
PR #21053 to address it. Blocked due to build is not succeeding due to unrelated reasons. |
Description
Introducing low memory monitor to keep an eye on heap usage. And when the heap usage crosses configured threshold, worker will stop processing new splits but continue processing the existing running, waiting and blocked splits. When the heap memory usage goes below threshold, worker start accepting new splits.
Motivation and Context
In Meta, we observed workers crashing due to out of memory. And the worker crashing are running lot more splits (running + waiting) when memory usage spikes. Given the Multi Level Split Queue behavior, workers keep running all the splits unless they are blocked. If the number of non blocked splits are high, this result in memory pressure on the worker and chances of OOM is higher. With this change, we can configure worker to avoid processing more splits if memory usage is high.
Impact
No Impact
Test Plan
Unit Tests and Meta verifier run
JMX metric showing when worker memory usage crosses threshold, skip split counter spikes. And it goes down as memory goes below threhold.
Five Minute Counter for Split Skip
Heap Memory Usage
Contributor checklist
Release Notes
Please follow release notes guidelines and fill in the release notes below.