Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MSQ: Rework memory management. #17057

Merged
merged 7 commits into from
Sep 14, 2024
Merged

MSQ: Rework memory management. #17057

merged 7 commits into from
Sep 14, 2024

Conversation

gianm
Copy link
Contributor

@gianm gianm commented Sep 13, 2024

This patch reworks memory management to better support multi-threaded workers running in shared JVMs. There are two main changes.

First, processing buffers and threads are moved from a per-JVM model to a per-worker model. This enables queries to hold processing buffers without blocking other concurrently-running queries. Changes:

  • Introduce ProcessingBuffersSet and ProcessingBuffers to hold the per-worker and per-work-order processing buffers (respectively). On Peons, this is the JVM-wide processing pool. On Indexers, this is a per-worker pool of on-heap buffers. (This change fixes a bug on Indexers where excessive processing buffers could be used if MSQ tasks ran concurrently with realtime tasks.)

  • Add "bufferPool" argument to GroupingEngine#process so a per-worker pool can be passed in.

  • Add "druid.msq.task.memory.maxThreads" property, which controls the maximum number of processing threads to use per task. This allows usage of multiple processing buffers per task if admins desire.

  • IndexerWorkerContext acquires processingBuffers when creating the FrameContext for a work order, and releases them when closing the FrameContext.

  • Add "usesProcessingBuffers()" to FrameProcessorFactory so workers know how many sets of processing buffers are needed to run a given query.

Second, adjustments to how WorkerMemoryParameters slices up bundles, to favor more memory for sorting and segment generation. Changes:

  • Instead of using same-sized bundles for processing and for sorting, workers now use minimally-sized processing bundles (just enough to read inputs plus a little overhead). The rest is devoted to broadcast data buffering, sorting, and segment-building.

  • Segment-building is now limited to 1 concurrent segment per work order. This allows each segment-building action to use more memory. Note that segment-building is internally multi-threaded to a degree. (Build and persist can run concurrently.)

  • Simplify frame size calculations by removing the distinction between "standard" and "large" frames. The new default frame size is the same as the old "standard" frames, 1 MB. The original goal of of the large frames was to reduce the number of temporary files during sorting, but I think we can achieve the same thing by simply merging a larger number of standard frames at once.

  • Remove the small worker adjustment that was added in Fix memory calculations for WorkerMemoryParameters for machines with relatively less heap space #14117 to account for an extra frame involved in writing to durable storage. Instead, account for the extra frame whenever we are actually using durable storage.

  • Cap super-sorter parallelism using the number of output partitions, rather than using a hard coded cap at 4. Note that in practice, so far, this cap has not been relevant for tasks because they have only been using a single processing thread anyway.

This patch reworks memory management to better support multi-threaded
workers running in shared JVMs. There are two main changes.

First, processing buffers and threads are moved from a per-JVM model to
a per-worker model. This enables queries to hold processing buffers
without blocking other concurrently-running queries. Changes:

- Introduce ProcessingBuffersSet and ProcessingBuffers to hold the
  per-worker and per-work-order processing buffers (respectively). On Peons,
  this is the JVM-wide processing pool. On Indexers, this is a per-worker
  pool of on-heap buffers. (This change fixes a bug on Indexers where
  excessive processing buffers could be used if MSQ tasks ran concurrently
  with realtime tasks.)

- Add "bufferPool" argument to GroupingEngine#process so a per-worker pool
  can be passed in.

- Add "druid.msq.task.memory.maxThreads" property, which controls the
  maximum number of processing threads to use per task. This allows usage of
  multiple processing buffers per task if admins desire.

- IndexerWorkerContext acquires processingBuffers when creating the FrameContext
  for a work order, and releases them when closing the FrameContext.

- Add "usesProcessingBuffers()" to FrameProcessorFactory so workers know
  how many sets of processing buffers are needed to run a given query.

Second, adjustments to how WorkerMemoryParameters slices up bundles, to
favor more memory for sorting and segment generation. Changes:

- Instead of using same-sized bundles for processing and for sorting,
  workers now use minimally-sized processing bundles (just enough to read
  inputs plus a little overhead). The rest is devoted to broadcast data
  buffering, sorting, and segment-building.

- Segment-building is now limited to 1 concurrent segment per work order.
  This allows each segment-building action to use more memory. Note that
  segment-building is internally multi-threaded to a degree. (Build and
  persist can run concurrently.)

- Simplify frame size calculations by removing the distinction between
  "standard" and "large" frames. The new default frame size is the same
  as the old "standard" frames, 1 MB. The original goal of of the large
  frames was to reduce the number of temporary files during sorting, but
  I think we can achieve the same thing by simply merging a larger number
  of standard frames at once.

- Remove the small worker adjustment that was added in apache#14117 to account
  for an extra frame involved in writing to durable storage. Instead,
  account for the extra frame whenever we are actually using durable storage.

- Cap super-sorter parallelism using the number of output partitions, rather
  than using a hard coded cap at 4. Note that in practice, so far, this cap
  has not been relevant for tasks because they have only been using a single
  processing thread anyway.
@github-actions github-actions bot added Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262 labels Sep 13, 2024
Copy link
Member

@clintropolis clintropolis left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

i think there is a soft conflict with #17058 since it added a test that makes a grouping engine and calls process

*
* @return result sequence for the cursor factory
*/
public Sequence<ResultRow> process(
GroupByQuery query,
CursorFactory cursorFactory,
@Nullable TimeBoundaryInspector timeBoundaryInspector,
NonBlockingPool<ByteBuffer> bufferPool,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it does feel kind of weird to pass in some resources to the process method since like part of the resources are still associated with the engine via GroupByResourcesReservationPool. I'm not entirely sure what is better to do though, so i don't think this is a blocker or anything

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could provide the GroupByResourcesReservationPool to process also, I suppose.

@gianm
Copy link
Contributor Author

gianm commented Sep 13, 2024

👍

i think there is a soft conflict with #17058 since it added a test that makes a grouping engine and calls process

There is, I've just fixed it.

@gianm gianm merged commit fd6706c into apache:master Sep 14, 2024
90 checks passed
@gianm gianm deleted the msq-memory-changes branch September 14, 2024 22:35
gianm added a commit to gianm/druid that referenced this pull request Sep 14, 2024
clintropolis pushed a commit that referenced this pull request Sep 15, 2024
pranavbhole pushed a commit to pranavbhole/druid that referenced this pull request Sep 17, 2024
* MSQ: Rework memory management.

This patch reworks memory management to better support multi-threaded
workers running in shared JVMs. There are two main changes.

First, processing buffers and threads are moved from a per-JVM model to
a per-worker model. This enables queries to hold processing buffers
without blocking other concurrently-running queries. Changes:

- Introduce ProcessingBuffersSet and ProcessingBuffers to hold the
  per-worker and per-work-order processing buffers (respectively). On Peons,
  this is the JVM-wide processing pool. On Indexers, this is a per-worker
  pool of on-heap buffers. (This change fixes a bug on Indexers where
  excessive processing buffers could be used if MSQ tasks ran concurrently
  with realtime tasks.)

- Add "bufferPool" argument to GroupingEngine#process so a per-worker pool
  can be passed in.

- Add "druid.msq.task.memory.maxThreads" property, which controls the
  maximum number of processing threads to use per task. This allows usage of
  multiple processing buffers per task if admins desire.

- IndexerWorkerContext acquires processingBuffers when creating the FrameContext
  for a work order, and releases them when closing the FrameContext.

- Add "usesProcessingBuffers()" to FrameProcessorFactory so workers know
  how many sets of processing buffers are needed to run a given query.

Second, adjustments to how WorkerMemoryParameters slices up bundles, to
favor more memory for sorting and segment generation. Changes:

- Instead of using same-sized bundles for processing and for sorting,
  workers now use minimally-sized processing bundles (just enough to read
  inputs plus a little overhead). The rest is devoted to broadcast data
  buffering, sorting, and segment-building.

- Segment-building is now limited to 1 concurrent segment per work order.
  This allows each segment-building action to use more memory. Note that
  segment-building is internally multi-threaded to a degree. (Build and
  persist can run concurrently.)

- Simplify frame size calculations by removing the distinction between
  "standard" and "large" frames. The new default frame size is the same
  as the old "standard" frames, 1 MB. The original goal of of the large
  frames was to reduce the number of temporary files during sorting, but
  I think we can achieve the same thing by simply merging a larger number
  of standard frames at once.

- Remove the small worker adjustment that was added in apache#14117 to account
  for an extra frame involved in writing to durable storage. Instead,
  account for the extra frame whenever we are actually using durable storage.

- Cap super-sorter parallelism using the number of output partitions, rather
  than using a hard coded cap at 4. Note that in practice, so far, this cap
  has not been relevant for tasks because they have only been using a single
  processing thread anyway.

* Remove unused import.

* Fix errorprone annotation.

* Fixes for javadocs and inspections.

* Additional test coverage.

* Fix test.
pranavbhole pushed a commit to pranavbhole/druid that referenced this pull request Sep 17, 2024
@kfaraz kfaraz added this to the 31.0.0 milestone Sep 30, 2024
kfaraz pushed a commit to kfaraz/druid that referenced this pull request Oct 1, 2024
* MSQ: Rework memory management.

This patch reworks memory management to better support multi-threaded
workers running in shared JVMs. There are two main changes.

First, processing buffers and threads are moved from a per-JVM model to
a per-worker model. This enables queries to hold processing buffers
without blocking other concurrently-running queries. Changes:

- Introduce ProcessingBuffersSet and ProcessingBuffers to hold the
  per-worker and per-work-order processing buffers (respectively). On Peons,
  this is the JVM-wide processing pool. On Indexers, this is a per-worker
  pool of on-heap buffers. (This change fixes a bug on Indexers where
  excessive processing buffers could be used if MSQ tasks ran concurrently
  with realtime tasks.)

- Add "bufferPool" argument to GroupingEngine#process so a per-worker pool
  can be passed in.

- Add "druid.msq.task.memory.maxThreads" property, which controls the
  maximum number of processing threads to use per task. This allows usage of
  multiple processing buffers per task if admins desire.

- IndexerWorkerContext acquires processingBuffers when creating the FrameContext
  for a work order, and releases them when closing the FrameContext.

- Add "usesProcessingBuffers()" to FrameProcessorFactory so workers know
  how many sets of processing buffers are needed to run a given query.

Second, adjustments to how WorkerMemoryParameters slices up bundles, to
favor more memory for sorting and segment generation. Changes:

- Instead of using same-sized bundles for processing and for sorting,
  workers now use minimally-sized processing bundles (just enough to read
  inputs plus a little overhead). The rest is devoted to broadcast data
  buffering, sorting, and segment-building.

- Segment-building is now limited to 1 concurrent segment per work order.
  This allows each segment-building action to use more memory. Note that
  segment-building is internally multi-threaded to a degree. (Build and
  persist can run concurrently.)

- Simplify frame size calculations by removing the distinction between
  "standard" and "large" frames. The new default frame size is the same
  as the old "standard" frames, 1 MB. The original goal of of the large
  frames was to reduce the number of temporary files during sorting, but
  I think we can achieve the same thing by simply merging a larger number
  of standard frames at once.

- Remove the small worker adjustment that was added in apache#14117 to account
  for an extra frame involved in writing to durable storage. Instead,
  account for the extra frame whenever we are actually using durable storage.

- Cap super-sorter parallelism using the number of output partitions, rather
  than using a hard coded cap at 4. Note that in practice, so far, this cap
  has not been relevant for tasks because they have only been using a single
  processing thread anyway.

* Remove unused import.

* Fix errorprone annotation.

* Fixes for javadocs and inspections.

* Additional test coverage.

* Fix test.
kfaraz added a commit that referenced this pull request Oct 1, 2024
This patch reworks memory management to better support multi-threaded
workers running in shared JVMs. There are two main changes.

First, processing buffers and threads are moved from a per-JVM model to
a per-worker model. This enables queries to hold processing buffers
without blocking other concurrently-running queries. Changes:

- Introduce ProcessingBuffersSet and ProcessingBuffers to hold the
  per-worker and per-work-order processing buffers (respectively). On Peons,
  this is the JVM-wide processing pool. On Indexers, this is a per-worker
  pool of on-heap buffers. (This change fixes a bug on Indexers where
  excessive processing buffers could be used if MSQ tasks ran concurrently
  with realtime tasks.)

- Add "bufferPool" argument to GroupingEngine#process so a per-worker pool
  can be passed in.

- Add "druid.msq.task.memory.maxThreads" property, which controls the
  maximum number of processing threads to use per task. This allows usage of
  multiple processing buffers per task if admins desire.

- IndexerWorkerContext acquires processingBuffers when creating the FrameContext
  for a work order, and releases them when closing the FrameContext.

- Add "usesProcessingBuffers()" to FrameProcessorFactory so workers know
  how many sets of processing buffers are needed to run a given query.

Second, adjustments to how WorkerMemoryParameters slices up bundles, to
favor more memory for sorting and segment generation. Changes:

- Instead of using same-sized bundles for processing and for sorting,
  workers now use minimally-sized processing bundles (just enough to read
  inputs plus a little overhead). The rest is devoted to broadcast data
  buffering, sorting, and segment-building.

- Segment-building is now limited to 1 concurrent segment per work order.
  This allows each segment-building action to use more memory. Note that
  segment-building is internally multi-threaded to a degree. (Build and
  persist can run concurrently.)

- Simplify frame size calculations by removing the distinction between
  "standard" and "large" frames. The new default frame size is the same
  as the old "standard" frames, 1 MB. The original goal of of the large
  frames was to reduce the number of temporary files during sorting, but
  I think we can achieve the same thing by simply merging a larger number
  of standard frames at once.

- Remove the small worker adjustment that was added in #14117 to account
  for an extra frame involved in writing to durable storage. Instead,
  account for the extra frame whenever we are actually using durable storage.

- Cap super-sorter parallelism using the number of output partitions, rather
  than using a hard coded cap at 4. Note that in practice, so far, this cap
  has not been relevant for tasks because they have only been using a single
  processing thread anyway.

Co-authored-by: Gian Merlino <[email protected]>
gianm added a commit to gianm/druid that referenced this pull request Oct 2, 2024
As a follow up to apache#17057, divide the amount of partitionStatsMemory
by two, to account for the fact that there are at some times going to
be two copies of the full collector. First there will be one for processors
and one for the accumulated collector. Then, after the processor ones are
GCed, a snapshot of the accumulated collector will be created.

Also includes an optimization to "addAll" for the two KeyCollectors,
for the case where we're adding into an empty collector. This is always
going to happen once per stage due to the "withAccumulation" call.
abhishekagarwal87 pushed a commit that referenced this pull request Oct 3, 2024
…7216)

* RunWorkOrder: Account for two simultaneous statistics collectors.

As a follow up to #17057, divide the amount of partitionStatsMemory
by two, to account for the fact that there are at some times going to
be two copies of the full collector. First there will be one for processors
and one for the accumulated collector. Then, after the processor ones are
GCed, a snapshot of the accumulated collector will be created.

Also includes an optimization to "addAll" for the two KeyCollectors,
for the case where we're adding into an empty collector. This is always
going to happen once per stage due to the "withAccumulation" call.

* Fix missing variable.

* Don't divide by numProcessingThreads twice.

* Fix test.
kfaraz pushed a commit to kfaraz/druid that referenced this pull request Oct 4, 2024
kfaraz pushed a commit to kfaraz/druid that referenced this pull request Oct 4, 2024
…ache#17216)

* RunWorkOrder: Account for two simultaneous statistics collectors.

As a follow up to apache#17057, divide the amount of partitionStatsMemory
by two, to account for the fact that there are at some times going to
be two copies of the full collector. First there will be one for processors
and one for the accumulated collector. Then, after the processor ones are
GCed, a snapshot of the accumulated collector will be created.

Also includes an optimization to "addAll" for the two KeyCollectors,
for the case where we're adding into an empty collector. This is always
going to happen once per stage due to the "withAccumulation" call.

* Fix missing variable.

* Don't divide by numProcessingThreads twice.

* Fix test.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Area - Batch Ingestion Area - MSQ For multi stage queries - https://github.com/apache/druid/issues/12262
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants