-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Managing memory usage during query execution #3
Conversation
Thanks @yjshen -- I hope to review this carefully tomorrow |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @yjshen . I read some of this PR carefully but did not review the entire thing
I really like how you used the external sort as a motivating example for the memory manager. Given the variety of places that DataFusion runs, I think it is unlikely we'll get the details of a partition aware memory manager working well at first -- I think starting with some sort of simpler strategy (like evenly divide the memory across all operators) might make sense
In terms of where to go next, here is what I suggest about:
- We introduce the
RuntimeEnv
and the parts of the memory manager API needed to allocate and deallocate memory (e.g. something likeMemoryConsumer::allocate()
andMemoryConsumer::release()
) - We add the appropriate calls (as we can) to all the operators to simply report what they are using
- As a follow on we can start to get fancier and calculate / estimate memory budgets for things like external sort
} | ||
|
||
let input = self.input.execute(partition).await?; | ||
external_sort(input, partition, self.expr.clone(), RUNTIME_ENV.clone()).await |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In terms of getting the memory manager into the actual plans, one thing that might be worth looking at is putting it on the ExecutionContextState
object (which the physical planner has access to). You could potentially add the RuntimeEnv
reference to that struct and then pass it into ExternaSortExec::new()
Alternatively, as I think we envision all operators getting memory from the RuntimeEnv
adding it as a new parameter to ExecutionPlan::execute()
might be the clearest thing to do (and then it would be clear all execution plan nodes would have this structure)
https://github.com/yjshen/arrow-datafusion/blob/mm/datafusion/src/physical_plan/planner.rs#L299
use std::sync::Arc; | ||
|
||
lazy_static! { | ||
/// Employ lazy static temporarily for RuntimeEnv, to avoid plumbing it through |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a smart idea for prototyping 👍
/// MergeSort in mem batches as well as spills into total order with `SortPreservingMergeStream`(SPMS). | ||
/// Always put in mem batch based stream to idx 0 in SPMS so that we could spill | ||
/// the stream when `spill()` is called on us. | ||
async fn sort(&self) -> Result<SendableRecordBatchStream> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't fully follow the logic here (it may be because I didn't follow all the async stream stuff correctly)
Normally I have seen external sort implemented like:
- Sort gets maxumum allowable memory budget
- Sort reads input until it is over its budget at which point it sorts the batch and writes to an external file
- The in memory buffer is cleared and input reading commences until input is done or the budget is exhausted again
- Once the input is done, the last buffer is sorted and then a N way merge is done on the files from disk
I can't quite map that algorithm to this code 🤔
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The external sort here is a little bit different here, it's acquiring memory budget at each record batch basis, instead of getting a huge budget of fixed size ahead of real usage. By acquiring memory for record batch at a time, we can get finer control of how much memory is really used in the system. Therefore, the external sort implementation here is:
- Get one record batch from input, acquire a memory budget of its size from the memory manager, sort the batch, and insert the batch to a vector that contains all in-memory, buffered batches.
- when the external sorter is required to spill to disk, (triggered by other consumers or just itself when no memory available) it first looks up if itself contains in-memory buffered batches, if so, it does a heap sort to merge all in-memory, self-sorted batches to a partial order, and writes to file, frees up all memory the vector contains.
- when all input batches are seen, do a merge sort of in-memory batches with all spills, resulting in a final, total order.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense @yjshen -- I think there is a tradeoff here:
If memory is allocated "on demand" for each record batch as you describe in this ticket, one operator can effectively consume all the memory, causing others to spill even when they might not need to. However, the on demand strategy does work well to ensure that all memory allocated is used
For example, if you have a plan like
Sort
GroupByHash
If you allocate memory on demand, it is possible that the group by hash consumes most/all of the memory available (and still requires spilling) so by the time the sort is run, it can't get much/all memory and so it spills unecessairly
However, it is true that if memory is allocated up front, there is the very real risk of allocating more memory to sort or group by hash than can be used (resulting in waste)
in conclusion, I think the algorithm you describe sounds like a good one
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For the Sort -> GroupBy -> ParquetScan case, and for most operator cases, the memory usage style is clear: a growing phase and a shrinking phase.
Give group by as an example; it would use up all available memory and require spills during hash building. i.e., the growing phase. When all inputs are inserted, it enters into a shrinking mode, while it may still hold some memory for output, but also spillable to first write its aggregation result to a file and then read file by its successor. Memory usage can only reduce since then.
However, bad guys that output while buffering while inputting, e.g., a window operator, do not behave well like grow-then-shrink. And if we have several windows chained in a single stage, the spilling behavior would likely be strange, often spills for a single operator, and hard to proceed for the entire stage. (For Spark's memory management, as well for my current implementation).
Two available solutions for this case are:
- allocation strategist priority aware. i.e., while insufficient memory is witnessed, we only trigger consumers of a lower priority to spill and throttle processing speed by starving the childing operators.
- executor aware scheduler, as brought out several times in the community, throttle records/batch processing speed for different operators by scheduling them with adjustable speed.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Give group by as an example; it would use up all available memory and require spills during hash building. i.e., the growing phase. When all inputs are inserted, it enters into a shrinking mode, while it may still hold some memory for output, but also spillable to first write its aggregation result to a file and then read file by its successor. Memory usage can only reduce since then.
The classic group by spill algorithm I am familiar with spills groups to disk, sorted by group key when the hash table memory is exhausted.
Then, data is output by merging the sorted runs from disk.
upon thinking about this, it does seem reasonable that the merge phase can end up using much less memory 👍
However, bad guys that output while buffering while inputting, e.g., a window operator, do not behave well like grow-then-shrink.
That is a good point
datafusion/src/error.rs
Outdated
@@ -61,6 +61,9 @@ pub enum DataFusionError { | |||
/// Error returned during execution of the query. | |||
/// Examples include files not found, errors in parsing certain types. | |||
Execution(String), | |||
/// This error is thrown when a consumer cannot acquire memory from the Memory Manager | |||
/// we can just cancel the execution of the partition. | |||
OutOfMemory(String), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OutOfMemory(String), | |
ResourcesExhausted(String), |
I think we may eventually have other resources that could be exhausted (like temp disk / object store space as well as file handles)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
} | ||
|
||
/// Spill at least `size` bytes to disk and update related counters | ||
async fn spill(&self, size: usize, trigger: &MemoryConsumerId) -> Result<usize> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we could combine spill()
with the actual allocation of a temp file (rather than the deallocation)
So like an interface such as fn reserve_spill(&self, size: usize) -> Result<filename>)
Which would effectively reserve a spill file up to size
in length
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let me try this
} | ||
|
||
/// Try allocate `required` bytes as needed | ||
async fn allocate(&self, required: usize) -> Result<()> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you considered some sort of RAII mechanism here (to help ensure allocations match up with deallocations)?
Something like
async fn allocate(&self, required: usize) -> Result<()> { | |
async fn allocate(&self, required: usize) -> Result<ResourceReservation> { |
and
struct ResourceReservation {...}
impl Drop for ResourceReservation {
fn drop(&mut self) -> {
// release the reservation
}
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't thought about it before, great idea👍
self.id().partition_id | ||
} | ||
|
||
/// Try allocate `required` bytes as needed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As I mentioned in the design doc, the distinction between "allocate required
bytes that I need to operate" and "give me a budget of extra memory I can use to work within" is an important one
External sort can still produce an answer with a small amount of memory, but will potentially go faster with a larger amount.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As explained in the previous sort implementation methods above, the memory consumers should try to allocate another bunch of memory in order to continue in-memory computation, or if no more memory is available, this allocate will trigger the partition manager (or just memory-manager if we combine the two) to find out suitable memory consumer(s) and trigger the spills.
So the suggestion here is to adjust the doc or of different design suggested?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think after thinking about this some more (see comment above), I don't have a specific suggestion. Sorry for confusion
Hi @alamb , sorry for this really late reply. I was prototyping sort-merge join, sort-based aggregate, spillable repartition after this initial primitive. And also digging more about the actual reason for Spark's problematic memory control that always results in OOM in our production workload (since the prototyping here is mainly based on that of Spark's). The key find outs are: Many of the memory consumption during Spark's execution is unmanaged, and this includes what you mentioned
Therefore, My current thinkings are:
And the result memory space usage will be:
|
Another strategy I have seen for this problem (basically internal buffers not accounted for properly) is a "slop" estimate for each operator. Something like "only allocate 80% of available resources (and assume that 20% will be used by the internal buffers, batches, etc" . Allocating / reporting per operator as you describe above is also a reasonable strategy -- the issue is that the reporting will likely always diverge from the actual implementation. Perhaps both strategies are needed for a real system
This is something I think Rust and arrow-rs can help with |
While talking about
I witnessed extreme cases where we set up only 40%~50% available resources for memory consumers but still suffered OOM. The chances are not rare, especially for chained window operators inside a single stage. 😂 I agree a "slop" parameter is still needed in our cases, will add one. |
* # This is a combination of 3 commits. # This is the 1st commit message: Add Display for Expr::BinaryExpr # This is the commit message #2: Update logical_plan/operators tests # This is the commit message #3: rebase and debug display for non binary expr * Add Display for Expr::BinaryExpr Update logical_plan/operators tests rebase and debug display for non binary expr Add Display for Expr::BinaryExpr Update logical_plan/operators tests Updating tests Update aggregate display Updating tests without aggregate More tests Working on agg/scalar functions Fix binary_expr in create_name function and attendant tests More tests More tests Doc tests Rebase and update new tests * Submodule update * Restore submodule references from master Co-authored-by: Andrew Lamb <[email protected]>
* Initial commit * initial commit * failing test * table scan projection * closer * test passes, with some hacks * use DataFrame (#2) * update README * update dependency * code cleanup (#3) * Add support for Filter operator and BinaryOp expressions (#4) * GitHub action (#5) * Split code into producer and consumer modules (#6) * Support more functions and scalar types (#7) * Use substrait 0.1 and datafusion 8.0 (#8) * use substrait 0.1 * use datafusion 8.0 * update datafusion to 10.0 and substrait to 0.2 (#11) * Add basic join support (#12) * Added fetch support (#23) Added fetch to consumer Added limit to producer Added unit tests for limit Added roundtrip_fill_none() for testing when None input can be converted to 0 Update src/consumer.rs Co-authored-by: Andy Grove <[email protected]> Co-authored-by: Andy Grove <[email protected]> * Upgrade to DataFusion 13.0.0 (#25) * Add sort consumer and producer (#24) Add consumer Add producer and test Modified error string * Add serializer/deserializer (#26) * Add plan and function extension support (#27) * Add plan and function extension support * Removed unwraps * Implement GROUP BY (#28) * Add consumer, producer and tests for aggregate relation Change function extension registration from absolute to relative anchor (reference) Remove operator to/from reference * Fixed function registration bug * Add test * Addressed PR comments * Changed field reference from mask to direct reference (#29) * Changed field reference from masked reference to direct reference * Handle unsupported case (struct with child) * Handle SubqueryAlias (#30) Fixed aggregate function register bug * Add support for SELECT DISTINCT (#31) Add test case * Implement BETWEEN (#32) * Add case (#33) * Implement CASE WHEN * Add more case to test * Addressed comments * feat: support explicit catalog/schema names in ReadRel (#34) * feat: support explicit catalog/schema names in ReadRel Signed-off-by: Ruihang Xia <[email protected]> * fix: use re-exported expr crate Signed-off-by: Ruihang Xia <[email protected]> Signed-off-by: Ruihang Xia <[email protected]> * move files to subfolder * RAT * remove rust.yaml * revert .gitignore changes * tomlfmt * tomlfmt Signed-off-by: Ruihang Xia <[email protected]> Co-authored-by: Daniël Heres <[email protected]> Co-authored-by: JanKaul <[email protected]> Co-authored-by: nseekhao <[email protected]> Co-authored-by: Ruihang Xia <[email protected]>
Closes apache#587
This PR includes the implementation of
ExternalSorter
for ease of comprehension.Check https://docs.google.com/document/d/1BT5HH-2sKq-Jxo51PNE6l9NNd_F-FyyYcyC3SKTnkIA/edit?usp=sharing for design.