-
Notifications
You must be signed in to change notification settings - Fork 3.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix memory calculations for WorkerMemoryParameters for machines with relatively less heap space #14117
Fix memory calculations for WorkerMemoryParameters for machines with relatively less heap space #14117
Conversation
final int superSorterMaxChannelsPerProcessor = maxNumFramesForSuperSorter / superSorterMaxActiveProcessors - 1; | ||
final int isSmallWorker = usableMemoryInJvm < SMALL_WORKER_CAPACITY_THRESHOLD_BYTES ? 1 : 0; | ||
// Apportion max frames to all processors equally, then subtract one to account for an output frame and one to account | ||
// for the durable storage's output frame in the supersorter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- Why does durable storage have an extra output frame?
- Also why is the subtraction valid for small heaps and not bigger heaps.
- Please add a test case for
WorkerMemoryParametersTest
throwing NotEnoughtMemoryFault for a smaller heap.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Updated with tests and the explanation in the comments. Regarding 2nd point, I have mentioned that we need to be conservative with the heap space in case of smaller heaps hence the subtraction is going on there and not in the case of larger heaps where we can have more parallelization without the risk of OOMing due to these temporary allocator factories.
final int superSorterMaxChannelsPerProcessor = maxNumFramesForSuperSorter / superSorterMaxActiveProcessors - 1; | ||
final int isSmallWorker = usableMemoryInJvm < SMALL_WORKER_CAPACITY_THRESHOLD_BYTES ? 1 : 0; | ||
// Apportion max frames to all processors equally, then subtract one to account for an output frame and one to account | ||
// for the durable storage's output frame in the supersorter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this sound better? I think the comment is a bit confusing as in, it doesn't explain why these two frames does not need to be reserved for large workers, as they would use them too. We should add that to the comment as well.
// for the durable storage's output frame in the supersorter | |
// for the durable storage's output frame in the supersorter. The durable storage's output frame is only considered if the worker is small to be more conservative with memory requirements. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the changes! LGTM!
This patch reworks memory management to better support multi-threaded workers running in shared JVMs. There are two main changes. First, processing buffers and threads are moved from a per-JVM model to a per-worker model. This enables queries to hold processing buffers without blocking other concurrently-running queries. Changes: - Introduce ProcessingBuffersSet and ProcessingBuffers to hold the per-worker and per-work-order processing buffers (respectively). On Peons, this is the JVM-wide processing pool. On Indexers, this is a per-worker pool of on-heap buffers. (This change fixes a bug on Indexers where excessive processing buffers could be used if MSQ tasks ran concurrently with realtime tasks.) - Add "bufferPool" argument to GroupingEngine#process so a per-worker pool can be passed in. - Add "druid.msq.task.memory.maxThreads" property, which controls the maximum number of processing threads to use per task. This allows usage of multiple processing buffers per task if admins desire. - IndexerWorkerContext acquires processingBuffers when creating the FrameContext for a work order, and releases them when closing the FrameContext. - Add "usesProcessingBuffers()" to FrameProcessorFactory so workers know how many sets of processing buffers are needed to run a given query. Second, adjustments to how WorkerMemoryParameters slices up bundles, to favor more memory for sorting and segment generation. Changes: - Instead of using same-sized bundles for processing and for sorting, workers now use minimally-sized processing bundles (just enough to read inputs plus a little overhead). The rest is devoted to broadcast data buffering, sorting, and segment-building. - Segment-building is now limited to 1 concurrent segment per work order. This allows each segment-building action to use more memory. Note that segment-building is internally multi-threaded to a degree. (Build and persist can run concurrently.) - Simplify frame size calculations by removing the distinction between "standard" and "large" frames. The new default frame size is the same as the old "standard" frames, 1 MB. The original goal of of the large frames was to reduce the number of temporary files during sorting, but I think we can achieve the same thing by simply merging a larger number of standard frames at once. - Remove the small worker adjustment that was added in apache#14117 to account for an extra frame involved in writing to durable storage. Instead, account for the extra frame whenever we are actually using durable storage. - Cap super-sorter parallelism using the number of output partitions, rather than using a hard coded cap at 4. Note that in practice, so far, this cap has not been relevant for tasks because they have only been using a single processing thread anyway.
* MSQ: Rework memory management. This patch reworks memory management to better support multi-threaded workers running in shared JVMs. There are two main changes. First, processing buffers and threads are moved from a per-JVM model to a per-worker model. This enables queries to hold processing buffers without blocking other concurrently-running queries. Changes: - Introduce ProcessingBuffersSet and ProcessingBuffers to hold the per-worker and per-work-order processing buffers (respectively). On Peons, this is the JVM-wide processing pool. On Indexers, this is a per-worker pool of on-heap buffers. (This change fixes a bug on Indexers where excessive processing buffers could be used if MSQ tasks ran concurrently with realtime tasks.) - Add "bufferPool" argument to GroupingEngine#process so a per-worker pool can be passed in. - Add "druid.msq.task.memory.maxThreads" property, which controls the maximum number of processing threads to use per task. This allows usage of multiple processing buffers per task if admins desire. - IndexerWorkerContext acquires processingBuffers when creating the FrameContext for a work order, and releases them when closing the FrameContext. - Add "usesProcessingBuffers()" to FrameProcessorFactory so workers know how many sets of processing buffers are needed to run a given query. Second, adjustments to how WorkerMemoryParameters slices up bundles, to favor more memory for sorting and segment generation. Changes: - Instead of using same-sized bundles for processing and for sorting, workers now use minimally-sized processing bundles (just enough to read inputs plus a little overhead). The rest is devoted to broadcast data buffering, sorting, and segment-building. - Segment-building is now limited to 1 concurrent segment per work order. This allows each segment-building action to use more memory. Note that segment-building is internally multi-threaded to a degree. (Build and persist can run concurrently.) - Simplify frame size calculations by removing the distinction between "standard" and "large" frames. The new default frame size is the same as the old "standard" frames, 1 MB. The original goal of of the large frames was to reduce the number of temporary files during sorting, but I think we can achieve the same thing by simply merging a larger number of standard frames at once. - Remove the small worker adjustment that was added in #14117 to account for an extra frame involved in writing to durable storage. Instead, account for the extra frame whenever we are actually using durable storage. - Cap super-sorter parallelism using the number of output partitions, rather than using a hard coded cap at 4. Note that in practice, so far, this cap has not been relevant for tasks because they have only been using a single processing thread anyway. * Remove unused import. * Fix errorprone annotation. * Fixes for javadocs and inspections. * Additional test coverage. * Fix test.
* MSQ: Rework memory management. This patch reworks memory management to better support multi-threaded workers running in shared JVMs. There are two main changes. First, processing buffers and threads are moved from a per-JVM model to a per-worker model. This enables queries to hold processing buffers without blocking other concurrently-running queries. Changes: - Introduce ProcessingBuffersSet and ProcessingBuffers to hold the per-worker and per-work-order processing buffers (respectively). On Peons, this is the JVM-wide processing pool. On Indexers, this is a per-worker pool of on-heap buffers. (This change fixes a bug on Indexers where excessive processing buffers could be used if MSQ tasks ran concurrently with realtime tasks.) - Add "bufferPool" argument to GroupingEngine#process so a per-worker pool can be passed in. - Add "druid.msq.task.memory.maxThreads" property, which controls the maximum number of processing threads to use per task. This allows usage of multiple processing buffers per task if admins desire. - IndexerWorkerContext acquires processingBuffers when creating the FrameContext for a work order, and releases them when closing the FrameContext. - Add "usesProcessingBuffers()" to FrameProcessorFactory so workers know how many sets of processing buffers are needed to run a given query. Second, adjustments to how WorkerMemoryParameters slices up bundles, to favor more memory for sorting and segment generation. Changes: - Instead of using same-sized bundles for processing and for sorting, workers now use minimally-sized processing bundles (just enough to read inputs plus a little overhead). The rest is devoted to broadcast data buffering, sorting, and segment-building. - Segment-building is now limited to 1 concurrent segment per work order. This allows each segment-building action to use more memory. Note that segment-building is internally multi-threaded to a degree. (Build and persist can run concurrently.) - Simplify frame size calculations by removing the distinction between "standard" and "large" frames. The new default frame size is the same as the old "standard" frames, 1 MB. The original goal of of the large frames was to reduce the number of temporary files during sorting, but I think we can achieve the same thing by simply merging a larger number of standard frames at once. - Remove the small worker adjustment that was added in apache#14117 to account for an extra frame involved in writing to durable storage. Instead, account for the extra frame whenever we are actually using durable storage. - Cap super-sorter parallelism using the number of output partitions, rather than using a hard coded cap at 4. Note that in practice, so far, this cap has not been relevant for tasks because they have only been using a single processing thread anyway. * Remove unused import. * Fix errorprone annotation. * Fixes for javadocs and inspections. * Additional test coverage. * Fix test.
* MSQ: Rework memory management. This patch reworks memory management to better support multi-threaded workers running in shared JVMs. There are two main changes. First, processing buffers and threads are moved from a per-JVM model to a per-worker model. This enables queries to hold processing buffers without blocking other concurrently-running queries. Changes: - Introduce ProcessingBuffersSet and ProcessingBuffers to hold the per-worker and per-work-order processing buffers (respectively). On Peons, this is the JVM-wide processing pool. On Indexers, this is a per-worker pool of on-heap buffers. (This change fixes a bug on Indexers where excessive processing buffers could be used if MSQ tasks ran concurrently with realtime tasks.) - Add "bufferPool" argument to GroupingEngine#process so a per-worker pool can be passed in. - Add "druid.msq.task.memory.maxThreads" property, which controls the maximum number of processing threads to use per task. This allows usage of multiple processing buffers per task if admins desire. - IndexerWorkerContext acquires processingBuffers when creating the FrameContext for a work order, and releases them when closing the FrameContext. - Add "usesProcessingBuffers()" to FrameProcessorFactory so workers know how many sets of processing buffers are needed to run a given query. Second, adjustments to how WorkerMemoryParameters slices up bundles, to favor more memory for sorting and segment generation. Changes: - Instead of using same-sized bundles for processing and for sorting, workers now use minimally-sized processing bundles (just enough to read inputs plus a little overhead). The rest is devoted to broadcast data buffering, sorting, and segment-building. - Segment-building is now limited to 1 concurrent segment per work order. This allows each segment-building action to use more memory. Note that segment-building is internally multi-threaded to a degree. (Build and persist can run concurrently.) - Simplify frame size calculations by removing the distinction between "standard" and "large" frames. The new default frame size is the same as the old "standard" frames, 1 MB. The original goal of of the large frames was to reduce the number of temporary files during sorting, but I think we can achieve the same thing by simply merging a larger number of standard frames at once. - Remove the small worker adjustment that was added in apache#14117 to account for an extra frame involved in writing to durable storage. Instead, account for the extra frame whenever we are actually using durable storage. - Cap super-sorter parallelism using the number of output partitions, rather than using a hard coded cap at 4. Note that in practice, so far, this cap has not been relevant for tasks because they have only been using a single processing thread anyway. * Remove unused import. * Fix errorprone annotation. * Fixes for javadocs and inspections. * Additional test coverage. * Fix test.
This patch reworks memory management to better support multi-threaded workers running in shared JVMs. There are two main changes. First, processing buffers and threads are moved from a per-JVM model to a per-worker model. This enables queries to hold processing buffers without blocking other concurrently-running queries. Changes: - Introduce ProcessingBuffersSet and ProcessingBuffers to hold the per-worker and per-work-order processing buffers (respectively). On Peons, this is the JVM-wide processing pool. On Indexers, this is a per-worker pool of on-heap buffers. (This change fixes a bug on Indexers where excessive processing buffers could be used if MSQ tasks ran concurrently with realtime tasks.) - Add "bufferPool" argument to GroupingEngine#process so a per-worker pool can be passed in. - Add "druid.msq.task.memory.maxThreads" property, which controls the maximum number of processing threads to use per task. This allows usage of multiple processing buffers per task if admins desire. - IndexerWorkerContext acquires processingBuffers when creating the FrameContext for a work order, and releases them when closing the FrameContext. - Add "usesProcessingBuffers()" to FrameProcessorFactory so workers know how many sets of processing buffers are needed to run a given query. Second, adjustments to how WorkerMemoryParameters slices up bundles, to favor more memory for sorting and segment generation. Changes: - Instead of using same-sized bundles for processing and for sorting, workers now use minimally-sized processing bundles (just enough to read inputs plus a little overhead). The rest is devoted to broadcast data buffering, sorting, and segment-building. - Segment-building is now limited to 1 concurrent segment per work order. This allows each segment-building action to use more memory. Note that segment-building is internally multi-threaded to a degree. (Build and persist can run concurrently.) - Simplify frame size calculations by removing the distinction between "standard" and "large" frames. The new default frame size is the same as the old "standard" frames, 1 MB. The original goal of of the large frames was to reduce the number of temporary files during sorting, but I think we can achieve the same thing by simply merging a larger number of standard frames at once. - Remove the small worker adjustment that was added in #14117 to account for an extra frame involved in writing to durable storage. Instead, account for the extra frame whenever we are actually using durable storage. - Cap super-sorter parallelism using the number of output partitions, rather than using a hard coded cap at 4. Note that in practice, so far, this cap has not been relevant for tasks because they have only been using a single processing thread anyway. Co-authored-by: Gian Merlino <[email protected]>
Description
While running a MSQ job on a worker on a machine with 128MBs of heap size, the job run with composing intermediate channel factories OOMed pretty quickly. This PR fixes the calculations with the worker memory parameters to add a "buffer" of one more frame in SuperSorter in the case of smaller machines.
Key changed/added classes in this PR
WorkerMemoryParameters
This PR has: