-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Initial MemoryManager and DiskManager APIs for query execution + External Sort implementation #1526
Conversation
Thanks @yjshen -- I'll try and give this a good look over the weekend |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Very exciting stuff, thanks @yjshen :)
/// Initialize | ||
pub(crate) fn initialize(self: &Arc<Self>) { | ||
let manager = self.clone(); | ||
let handle = task::spawn(async move { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Am i correct that this background refresh process is needed because tracking consumer memory updates are managed internally instead of through MemoryManager?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I'm creating a background task that runs periodically to update tracking consumers'
total memory usage, to avoid controlling consumers
to ask for available memory frequently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I haven't put much thought into this yet, but I am curious what are your thoughts on having tracking consumers also report memory usage update directly to the memory manager? Basically similar to what we have with the controlling consumers, but without the capability to force them to spill.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main reason is to reduce interaction with the memory manager during one's execution, to reduce complexity as well as eliminate synchronization needs.
- For tracking consumers that were converted from controlling consumers. for example, the hashtable size / partial sort in-mem size is known when tracking consumer is created or transformed to, then no more need for them to acquire memory or interact with memory manager.
- For other tracking consumers with internal computational buffers. One can report its usage by simply updating its internal state
mem_used
, no extra function calls or interaction with the memory manager during execution.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the idea of periodically polling tracking consumers is reasonable.
I am a little worried about a task that polls based on some clock interval, however -- it is likely that the frequency will be too fast or two slow.
What about updating tracking consumers every call to try_grow?
or query to the memory manager for total memory used?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are no more maintained tracker_total
and no more background maintaining tasks. Memory manager decides total tracker memory each time its can_grow
is called now.
@yjshen thanks, this is milestone pr for memory controller in datafusion. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you so much @yjshen
I think the memory manager interface in this PR (MemoryManager
/ MemoryConsumer
) is a nice good foundation going forward.
Prior to merging this PR I would like to see:
- The ref count cycle between the memory manager and execution plans, I think this PR could me merged into DataFusion as is and we could iterate from there
- Some tests for
MemoryManager
andExternalSorter
(as suggested in the PR comments here)
I also think it is worth removing / reconsidering the background loop for tracked memory consumers as well, though since there isn't used yet I don't think it is critical to remove prior to merging this PR
But again, really nice and thank you for this contribution
} | ||
|
||
/// Register a new memory consumer for memory usage tracking | ||
pub(crate) fn register_consumer(self: &Arc<Self>, consumer: Arc<dyn MemoryConsumer>) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I didn't see any code that registered any Tracking consumers yet.
In terms of plumbing, what do you think about:
- making all
ExecutionPlans
MemoryConsumers
and providing default implementations (that reported 0 usage) - Registering all ExecutionPlans somehow as MemoryConsumers as part of physical plan creation?
That way all implementations of ExecutionPlan could report their usage without having to explicitly register themselves with the memory manager. Also the manager could report on how many operators were not providing any statistics, etc
@houqp @alamb Thanks for your detailed and insightful review! Resolved:
To discuss:
There is one in
I think there is a gap between ExecPlan and MemoryConsumer. Since an /// Trait for types that stream [arrow::record_batch::RecordBatch]
pub trait RecordBatchStream: Stream<Item = ArrowResult<RecordBatch>> + MemoryConsumer {
/// Returns the schema of this `RecordBatchStream`.
///
/// Implementation of this trait should guarantee that all `RecordBatch`'s returned by this
/// stream should have the same schema as returned from this method.
fn schema(&self) -> SchemaRef;
}
/// Trait for a stream of record batches.
pub type SendableRecordBatchStream = Pin<Arc<dyn RecordBatchStream + Send + Sync>>; Should I make SendableRecordBatchStream pin arc instead of pin box and register each stream arc to runtime at each pub fn register_consumer(&self, memory_consumer: &Arc<dyn MemoryConsumer>) { may sometimes be awkward: runtime.register_consumer(&(streams.clone() as Arc<dyn MemoryConsumer>)); Any thoughts? |
This is a good point (that the memory management is done on a per-partition basis rather than a per I would recommend we don't change I will make time today to review this PR again thoroughly -- thank you @yjshen I think we are close |
Not fully caught up, but how would you consume from such a thing? You need a mutable reference to poll a stream? Streams, like iterators, are not meant to be shared. As an aside the Sync constraint on SendableRecordBatchStream is potentially extraneous for this reason, you can't do much with a shared stream anyway, so requiring share-ability between threads imposes unnecessary implementation constraints |
@tustvold Thanks for bringing it up. I find the stream a single place to have all runtime entities be auto-registered to the memory manager at once. Maybe a wrapper over the stream could achieve the goal? |
My instinct would be to suggest having the shared ref internal to the stream implementation, instead of a wrapper. Otherwise I suspect you will run into borrow checker, pinning, and async pain. This would also avoid needing to make breaking changes to SendableRecordBatchStream? Another thing to potentially think about is that many of the operators aren't actually streams, rather they spawn a tokio task and then return an mpsc queue. There will need to be some accounting of both data buffered in the queue, and data in the operators "task". My gut feeling is this is going to require adding some sort of RAII tracking field to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed the memory manager changes -- I think it is good enough to start with and we can iterate from there.
I didn't get a chance to fully review the changes to sort_preserving_merge
-- will keep at it tomorrow.
cc @tustvold
/// Merge buffered, self-sorted record batches to get an order. | ||
/// | ||
/// Internally, it uses MinHeap to reduce extra memory consumption | ||
/// by not concatenating all batches into one and sorting it as done by `SortExec`. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to study the connection between SortExec
, SortPreservingMergeStream
and InMemSortStream
some more to fully get understand this. I can't help by think that InMemSortStream
is doing the same thing as SortPreservingMergeStream
-- and I wonder if we can reuse that same code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The main difference between InMemSortStream
and SortPreservingMergeStream
lies in assuming different numbers of "entity" (batches for IMSS and streams for SPMS) merged.
Since InMemSort
means to merge much more partially ordered "entities", the sorter should reduce the num of comparison for each item pop-up. Hence a MinHeap was introduced.
On the other hand, InMemSort
is more specialized to have each "entity" only one record batch, therefore simplified logic compared to consider stream continuation in SortPreservingMergeStream
.
Currently, the common parts SortKeyCursor
and RowIndex
for both sorts are extracted to sorts/mod.rs
reduce duplication.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also don't have a good solution to the SendableRecordBatchStream
problem off the top of my head, will need to think more about it. Other than that, I think the change looks good as a first iteration 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you again @yjshen for this great contribution.
I have some concerns about parts of this PR (listed below), but I still think we should merge this PR as is and handle the concerns as follow ons.
Thus my plan is to send a note to the dev mailing list and slack channel asking for comments (on the API specifically) and if we don't hear any major concerns I suggest we merge this PR tomorrow.
My rationale for the merge with concerns doing is:
- This PR gets the necessary foundations in for limiting resources at runtime: specifically the
RuntimeEnv
, andMemoryConsumer
,MemoryManager
, andDiskManager
APIs. - It is backwards compatible (e.g. external sort is not connected to anything, so there should be no performance regressions)
I think it would be good to file a follow on PR marking the MemoryManager, DiskManager, and MemoryConsumer APIs as experimental and I will prepare such a PR.
My Concerns (aka major follow on work):
- External sorting is not connected to anything -- aka it is code that isn't used (yet)
InMemSortStream
andSortPreservingMergeStream
are doing very much the same thing -- consolidating the code I think will be important as we move to optimize them- As most of the rest of the system isn't connected to the memory system, the APIs may not be fully adequate (but we can iterate on that)
This PR also unlocks some cool follow on projects (like supporting external group by / group by hash / spill to disk) 🚗
FWIW I also plan to run the TPCH benchmarks on this PR and will post the results (I don't expect any changes) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @yjshen, great work! I went through it briefly. BTW, some of the details are well handled, such as pool size
.
let path = tmp_dir.path().to_str().unwrap().to_string(); | ||
std::mem::forget(tmp_dir); | ||
|
||
Self { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is better to define constants for default values of batch_size
and memory_fraction
than bare numbers
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is already the default of config and meant to be overwritten if needed. And a single place for these defaults?
BTW, I think it is worth reconsidering and restructuring the multiple configs and their usages. ExecutionConfig
, PhysicalPlanConfig
and RuntimeConfig
.
At least we should not pass target_batch_size
during query planning since we already have runtimeEnv plumbing through the execute() API now, will create follow-up PR once we've merged this one.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make sense to me. We can deal with it uniformly in the following PR
Will you post the result in this review? |
Yes, I will do so. |
Ok, I am going to fire up my benchmark machine, get some numbers, and assuming they look good merge this PR |
Here are the results of my comparison: the Setup:
Benchmark command: cd benchmarks
cargo run --release --bin tpch -- benchmark datafusion --partitions 16 -m --iterations 10 --path /data/tpch_data_10G/ --format tbl --query 1
|
I think while not perfect this PR is a step in the right direction towards being able to handle queries that need to spill. 🚀 thank you @yjshen Shall I file follow on tickets for the next step? I am particularly interested in ensuring we consolidate the Sort code (so there is only a single sort operator that does in memory sorting if it has enough memory budget but then spills to disk if needed). I would enjoy helping make this happen (perhaps by writing some tests?) |
Thank you all again for helping me with the initial document proposal as well as insightful reviews in this PR ❤️
Yes, that would be great! please open issues that you have in mind. I have a bunch of ideas as follow-ups as well. I think we already have the foundation for many exciting features to come. How do you like to open an umbrella issue as well as sub-task issues? I could file sub-tasks issues under it as well.
Thanks, I can do the initial consolidation, please join at any time or just take it over. depends on your time schedule. |
Which issue does this PR close?
Closes #587 .
Rationale for this change
When DataFusion processes a single partition, it will keep allocating memory until the OS or the container system kills it. To make it worse, concurrently executing partitions or even simultaneously running plans will compete for available memory until all memory is exhausted. It is more challenging to meet the memory requirements for all operators of each partition when it is running. None of the partitions or plans would run to finish.
Therefore, the ability to control the total memory usage of the process as a whole, and at the same time, allocate the available memory to each execution partition is extremely important. Under this guarantee: when the memory is sufficient, the operator can acquire as much of the memory to do the computation; when the memory is tight, the operator can be downgraded to use the disk to store some intermediate results (spilling to disk) and use a limited memory for execution.
What changes are included in this PR?
The proposed memory management architecture is the following:
RuntimeConfig.max_memory
andRuntimeConfig.memory_fraction
(float64 between 0..1). The actual max memory DataFusion could usepool_size = max_memory * memory_fraction
.Memory Consumers
. Operators or others are encouraged to register themselves to the memory manager and report its usage throughmem_used()
.Controlling
consumers that would acquire memory during its execution and release memory throughspill
if no more memory is available.Tracking
consumers that exist for reporting purposes to provide a more accurate memory usage estimation for memory consumers.(pool_size - all_tracking_used) / active_num_controlling_consumers
.Are there any user-facing changes?
Users could limit the max memory used for DataFusion through
RuntimeConfig::max_memory
andRuntimeConfig::memory_fraction
Note
In addition to the proposed memory manager as well as the runtime that plumbing the execute API, an
ExternalSortExec
is implemented to illustrate the API usage.