Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Heap Memory Based Worker Flag to stop processing new split when in low memory #20946

Merged

Conversation

swapsmagic
Copy link
Contributor

@swapsmagic swapsmagic commented Sep 22, 2023

Description

Introducing low memory monitor to keep an eye on heap usage. And when the heap usage crosses configured threshold, worker will stop processing new splits but continue processing the existing running, waiting and blocked splits. When the heap memory usage goes below threshold, worker start accepting new splits.

Motivation and Context

In Meta, we observed workers crashing due to out of memory. And the worker crashing are running lot more splits (running + waiting) when memory usage spikes. Given the Multi Level Split Queue behavior, workers keep running all the splits unless they are blocked. If the number of non blocked splits are high, this result in memory pressure on the worker and chances of OOM is higher. With this change, we can configure worker to avoid processing more splits if memory usage is high.

Impact

No Impact

Test Plan

Unit Tests and Meta verifier run

JMX metric showing when worker memory usage crosses threshold, skip split counter spikes. And it goes down as memory goes below threhold.

Five Minute Counter for Split Skip
Screenshot 2023-10-03 at 3 35 24 PM

Heap Memory Usage
Screenshot 2023-10-03 at 3 35 46 PM

Contributor checklist

  • Please make sure your submission complies with our development, formatting, commit message, and attribution guidelines.
  • PR description addresses the issue accurately and concisely. If the change is non-trivial, a GitHub Issue is referenced.
  • Documented new properties (with its default value), SQL syntax, functions, or other functionality.
  • If release notes are required, they follow the release notes guidelines.
  • Adequate tests were added if applicable.
  • CI passed.

Release Notes

Please follow release notes guidelines and fill in the release notes below.

== RELEASE NOTES ==

General Changes
* Add config to enable memory based split processing slow down. This can be enabled via `task.memory-based-slowdown-threshold`

@swapsmagic swapsmagic requested a review from a team as a code owner September 22, 2023 22:08
@swapsmagic swapsmagic force-pushed the worker_to_process_based_on_heap_usage branch from d545b6a to d67cf20 Compare September 22, 2023 22:29
@@ -254,6 +271,8 @@ public TaskExecutor(
Duration interruptSplitInterval,
EmbedVersion embedVersion,
MultilevelSplitQueue splitQueue,
boolean memoryBasedSlowDownEnabled,
double memoryBasedSlowDownThreshold,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we have just one config and somehow infer enabled/disabled from that one ?

@@ -530,6 +551,13 @@ private synchronized void startSplit(PrioritizedSplitRunner split)

private synchronized PrioritizedSplitRunner pollNextSplitWorker()
{
if (memoryBasedSlowDownEnabled) {
MemoryUsage memoryUsage = MEMORY_MX_BEAN.getHeapMemoryUsage();
if (memoryUsage.getUsed() > memoryUsage.getCommitted() * memoryBasedSlowDownThreshold) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you jmh microbenchmark the cost of getting this bean calls -- I am concerned about introducing this in the hot path for each split.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is where you should plug in: This will simply allow running the existing splits in the waitingSplits -- all this is does is not release any more "scan/leaf" splits to mlsq -- it does nothing about intermediate splits eg.

Did you consider teaching the MLSQ about this policy instead ? It will naturally stall the split runner threads.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the point in time checks might not be representative of the overall health. If I am not wrong the memory allocation will follow a sawtooth pattern.
Relevant article from Elasticsearch guys where they rely on the fill rate of old gen to detect memory pressure.
https://www.elastic.co/blog/found-understanding-memory-pressure-indicator

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The memory usage is pulled out into a seperate thread so it won't impact the task/split processing.

@tdcmeehan
Copy link
Contributor

Interesting work! Could we create an issue describing the design? I am curious how slowing down split processing will interact with memory management and how it will avoid deadlock.

@swapsmagic swapsmagic force-pushed the worker_to_process_based_on_heap_usage branch from d67cf20 to 067f149 Compare September 27, 2023 00:23
@swapsmagic
Copy link
Contributor Author

Interesting work! Could we create an issue describing the design? I am curious how slowing down split processing will interact with memory management and how it will avoid deadlock.

Added details in the description with more details.

@swapsmagic
Copy link
Contributor Author

how it will avoid deadlock.
@tdcmeehan can you share more details on what kind of deadlock you are anticipating in this scenario?

@tdcmeehan
Copy link
Contributor

I am trying to understand the idea and don't know if it will cause deadlock.

But deadlock crossed my mind because it seems this is taking a different route to tackle the same problem as the memory management framework: https://prestodb.io/blog/2019/08/19/memory-tracking

However, that framework also solves the larger problem: when one pool is blocked, to avoid the cluster from deadlocking and not making progress overall, it can kill queries (or promote them to the reserved pool).

If my idea about this change is correct, it seems this solves the problem locally but not the global problem. And if that's the case, suppose this framework kicks in: couldn't it cause either a slowdown or a lockup on one worker, which could cause a cluster-wide slowdown as well (since the cluster would be bottlenecked by the one single worker)?

Let me know if I'm missing some context or not understanding correctly.

@swapsmagic
Copy link
Contributor Author

swapsmagic commented Sep 28, 2023

I am trying to understand the idea and don't know if it will cause deadlock.

As provided details in the description: we are noticing our low memory workers are crashing due to OOM. And during the time the workers were having higher number of splits (running + waiting). Given the workers are already using lot more memory, we want to prevent them from doing lot more work. And to achieve that we are introducing this low memory flag, that prevents worker from running splits if the previous one finished or a new one being submitted as part of the task update requests.

But deadlock crossed my mind because it seems this is taking a different route to tackle the same problem as the memory management framework: https://prestodb.io/blog/2019/08/19/memory-tracking

However, that framework also solves the larger problem: when one pool is blocked, to avoid the cluster from deadlocking and not making progress overall, it can kill queries (or promote them to the reserved pool).

For memory framework, true that it handles the memory tracking. But we at Meta noticed the accounted memory in our MemoryPool is way lower than the actual heap usage. And that means we can't rely on the memory framework to kick in and kill queries given the gap between how much memory is free as per Heap vs how much memory is free as per memory tracking framework is far off.

If my idea about this change is correct, it seems this solves the problem locally but not the global problem. And if that's the case, suppose this framework kicks in: couldn't it cause either a slowdown or a lockup on one worker, which could cause a cluster-wide slowdown as well (since the cluster would be bottlenecked by the one single worker)?

Let me know if I'm missing some context or not understanding correctly.

The change is not trying to solve overall cluster level memory issue but a local issue where worker is running hot resulting it being running out of memory. To prevent that, we want the worker to do less work for the time being till the memory frees up and pick up the work again. Sure this will slow down but at the cost of reliability. Without slowing down the cluster, worker can crash and resulting in lot more query failures which we want to prevent in this scenario. Also the feature is config driven so can be tuned as per the need and can be disabled as well. Hope this explains the rational behind this PR.

@MnO2
Copy link
Contributor

MnO2 commented Sep 28, 2023

I am trying to understand the idea and don't know if it will cause deadlock.

But deadlock crossed my mind because it seems this is taking a different route to tackle the same problem as the memory management framework: https://prestodb.io/blog/2019/08/19/memory-tracking

However, that framework also solves the larger problem: when one pool is blocked, to avoid the cluster from deadlocking and not making progress overall, it can kill queries (or promote them to the reserved pool).

If my idea about this change is correct, it seems this solves the problem locally but not the global problem. And if that's the case, suppose this framework kicks in: couldn't it cause either a slowdown or a lockup on one worker, which could cause a cluster-wide slowdown as well (since the cluster would be bottlenecked by the one single worker)?

Let me know if I'm missing some context or not understanding correctly.

@tdcmeehan We are looking for a stop-bleeding solution to solve the hot spot problem, mainly due to the split brain issue from the disaggregated coordinator. We already have many OKR metric violation this month (you know the UER :p). From the top of my head if I have to construct a case for the deadlock. It is when a query trying to do a hash join, and the build side accumulate very large data and make the hash table exceeds the threshold. I think that might lead to a dead lock. The hash table wouldn’t be released since the task is unfinished. and the hash join cannot proceed since no more leaf splits would get executed. However, when we set the local memory limit to be significantly lower than the threshold, the query should be killed before the threshold is reached. But this is from my speculation.

@swapsmagic If what I speculated holds true, I guess it would be great to add a foolproof measure when loading the config. For example: when the threshold is below the local memory limit then warns the users the potential consequence or suggest a correct config combination? Otherwise the PR looks good to me in general. For sure we need to address Tim's concern.

@swapsmagic swapsmagic force-pushed the worker_to_process_based_on_heap_usage branch from 067f149 to cbe2cfd Compare September 28, 2023 16:31
@swapsmagic
Copy link
Contributor Author

@swapsmagic If what I speculated holds true, I guess it would be great to add a foolproof measure when loading the config. For example: when the threshold is below the local memory limit then warns the users the potential consequence or suggest a correct config combination? Otherwise the PR looks good to me in general. For sure we need to address Tim's concern.

Added a min value check so threshold can't be set too low and avoid having slow down or deadlock situation.

@swapsmagic swapsmagic force-pushed the worker_to_process_based_on_heap_usage branch 2 times, most recently from 40e9c9c to e03f31d Compare September 28, 2023 18:47
Copy link
Contributor

@MnO2 MnO2 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks good to me.

Copy link
Contributor

@agrawaldevesh agrawaldevesh left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I want to confirm the semantics -- maybe you should call that out clearly in the PR description.

This PR does not pause running splits when there is memory pressure. It just doesn't take in any new leaf splits when that happens. Right ? So we should expect to see the waitingSplits eventually run dry, right ? The PR description says "slow down", but it is more like "pause taking in new work". ie its very binary.

i don't see any tests for this. Also, I think we should describe what improvements we saw with this in the PR test plan. Like how you created a memory pressure scenario and how did this PR help with that. Maybe you can show a graph of RunningSplits / WaitingSplits or describe the same in words. As a control plane change, it would be great to validate its effectiveness and responsiveness.

//Worker skip processing split if jvm heap usage crosses configured threhold
//Helps reduce memory pressure on the worker and avoid OOMs
if (isLowMemory()) {
log.debug("Skip task scheduling due to low memory");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to add this logging in the isLowMemory method so that relevant extra stats can be logged as well there, providing more insight into that decision.

Also maybe the counter update there as well to DRY out this code

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both places where we are checking low memory, put a different logging to help understand which flow is being skipped. This will help in debugging issues in the future and that's why it's a debug log and not info log.

@swapsmagic swapsmagic force-pushed the worker_to_process_based_on_heap_usage branch from e03f31d to a9aee78 Compare October 2, 2023 18:51
@jainxrohit
Copy link
Contributor

@swapsmagic Can you please add some jmx metrics for the testing you did?

@jainxrohit jainxrohit self-requested a review October 3, 2023 04:53
@swapsmagic swapsmagic force-pushed the worker_to_process_based_on_heap_usage branch 2 times, most recently from 01cc3ff to 11628f0 Compare October 3, 2023 22:38
long memoryThreshold = (long) (maxMemory * threshold);

if (usedMemory > memoryThreshold) {
if (!taskExecutor.isLowMemory()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It maybe simpler to have the TaskExecutor do its own state keeping for low memory mode. It can ensure the idempotency you are shooting for in addition to the logging. So the call can become simpler here in terms of taskExecutor.setLowMemory(usedMemory, maxMemory, memoryThresh).

It should be a low overhead call since anyway it is done once every K milliseconds

Copy link
Contributor

@ajaygeorge ajaygeorge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding some comments.


public void checkLowMemory()
{
MemoryMXBean mbean = ManagementFactory.getMemoryMXBean();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can initialize this once in the constructor.
private final MemoryMXBean memoryMXBean;
...
this.memoryMXBean = ManagementFactory.getMemoryMXBean();


if (usedMemory > memoryThreshold) {
if (!taskExecutor.isLowMemory()) {
log.debug("Enabling Low Memory: Used: " + usedMemory + " Max: " + maxMemory + " Threshold: " + memoryThreshold);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit use parameterized logging.

}
else {
if (taskExecutor.isLowMemory()) {
log.debug("Enabling Low Memory: Used: " + usedMemory + " Max: " + maxMemory + " Threshold: " + memoryThreshold);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit use parameterized logging. Also shouldn't the message be Disabling instead of Enabling


private final TimeStat blockedQuantaWallTime = new TimeStat(MICROSECONDS);
private final TimeStat unblockedQuantaWallTime = new TimeStat(MICROSECONDS);

private volatile boolean closed;

private AtomicBoolean lowMemory = new AtomicBoolean(false);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

final

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made the variable volatile boolean.

Comment on lines 73 to 82
if (!taskExecutor.isLowMemory()) {
log.debug("Enabling Low Memory: Used: " + usedMemory + " Max: " + maxMemory + " Threshold: " + memoryThreshold);
taskExecutor.setLowMemory(true);
}
}
else {
if (taskExecutor.isLowMemory()) {
log.debug("Enabling Low Memory: Used: " + usedMemory + " Max: " + maxMemory + " Threshold: " + memoryThreshold);
taskExecutor.setLowMemory(false);
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check- then-set part is interesting. I think even with atomic booleans, the value can change from the time you check and then set. I'm not sure that will create subtle issues in this code. Please give it some thought.

Prefer using CAS operations to get rid of check- then-set.

        if (usedMemory > memoryThreshold) {
            if (taskExecutor.getLowMemory().compareAndSet(false, true)) {
                log.debug("Enabling Low Memory: Used: " + usedMemory + " Max: " + maxMemory + " Threshold: " + memoryThreshold);
            }
        }
        else {
            if (taskExecutor.getLowMemory().compareAndSet(true, false)) {
                log.debug("Disabling Low Memory: Used: " + usedMemory + " Max: " + maxMemory + " Threshold: " + memoryThreshold);
            }
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given single thread is doing the check and update, this should not be the issue.

@swapsmagic swapsmagic force-pushed the worker_to_process_based_on_heap_usage branch 2 times, most recently from a7ada32 to 2f4c54a Compare October 4, 2023 16:36
@swapsmagic swapsmagic changed the title Heap Memory Based Worker Split Processing Slow Down Heap Memory Based Worker Flag to stop processing new split when in low memory Oct 4, 2023
@jainxrohit jainxrohit self-requested a review October 4, 2023 16:48
Copy link
Contributor

@jainxrohit jainxrohit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good to me. Lets also mention about the follow up change in the memory framework we are planning to do in the description.

taskExecutor.setLowMemory(true);
}
}
else {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: else and if can be merged.


private final TimeStat blockedQuantaWallTime = new TimeStat(MICROSECONDS);
private final TimeStat unblockedQuantaWallTime = new TimeStat(MICROSECONDS);

private volatile boolean closed;

private volatile boolean lowMemory;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add a comment saying it should only be updated from 1 thread from the lowMemoryExecutor

Copy link
Contributor

@ajaygeorge ajaygeorge left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ajaygeorge
Copy link
Contributor

Please address the checkstyle failures as well.

Introducing heap memory usage based slow down of worker. When worker memory utilization increases to certain threshold,
it will stop processing new splits till the memory usage goes below threshold. This will help avoid scenario when
worker heap usage is too high but has more work to do and resulting in out of memory error.
@swapsmagic swapsmagic force-pushed the worker_to_process_based_on_heap_usage branch from 2f4c54a to ff3cbb8 Compare October 4, 2023 18:56
@swapsmagic swapsmagic merged commit 0010d55 into prestodb:master Oct 5, 2023
@tdcmeehan
Copy link
Contributor

As I mentioned, local slowdown of splits may cause distributed issues. Please, can we follow up this PR with two things:

  1. Given the experimental nature of the changes, please prefix the config with experimental.
  2. Please add a caveat in the ConfigDescription that this config setting may induce cluster slowdown or deadlock in certain conditions, at least until the experimental prefix remains.

@swapsmagic
Copy link
Contributor Author

As I mentioned, local slowdown of splits may cause distributed issues. Please, can we follow up this PR with two things:

  1. Given the experimental nature of the changes, please prefix the config with experimental.
  2. Please add a caveat in the ConfigDescription that this config setting may induce cluster slowdown or deadlock in certain conditions, at least until the experimental prefix remains.

PR #21053 to address it. Blocked due to build is not succeeding due to unrelated reasons.

@wanglinsong wanglinsong mentioned this pull request Dec 8, 2023
26 tasks
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants