Skip to content

Commit

Permalink
Fix memory calculations for WorkerMemoryParameters for machines with …
Browse files Browse the repository at this point in the history
…relatively less heap space (#14117)

* update worker memory parameters
  • Loading branch information
LakshSingla authored May 2, 2023
1 parent 078d5ac commit 387e682
Show file tree
Hide file tree
Showing 2 changed files with 58 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,13 @@ public class WorkerMemoryParameters
*/
private static final long PARTITION_STATS_MEMORY_MAX_BYTES = 300_000_000;

/**
* Threshold in bytes below which we assume that the worker is "small". While calculating the memory requirements for
* a small worker, we try to be as conservatives with the estimates and the extra temporary space required by the
* frames, since that can add up quickly and cause OOM.
*/
private static final long SMALL_WORKER_CAPACITY_THRESHOLD_BYTES = 256_000_000;

/**
* Fraction of free memory per bundle that can be used by {@link org.apache.druid.msq.querykit.BroadcastJoinHelper}
* to store broadcast data on-heap. This is used to limit the total size of input frames, which we expect to
Expand Down Expand Up @@ -309,8 +316,32 @@ public static WorkerMemoryParameters createInstance(
)
);

// Apportion max frames to all processors equally, then subtract one to account for an output frame.
final int superSorterMaxChannelsPerProcessor = maxNumFramesForSuperSorter / superSorterMaxActiveProcessors - 1;
final int isSmallWorker = usableMemoryInJvm < SMALL_WORKER_CAPACITY_THRESHOLD_BYTES ? 1 : 0;
// Apportion max frames to all processors equally, then subtract one to account for an output frame and one to account
// for the durable storage's output frame in the supersorter. The extra frame is required in case of durable storage
// since composing output channel factories keep a frame open while writing to them.
// We only account for this extra frame in the workers where the heap size is relatively small to be more
// conservative with the memory estimations. In workers with heap size larger than the frame size, we can get away
// without accounting for this extra frame, and instead better parallelize the supersorter's operations.
final int superSorterMaxChannelsPerProcessor = maxNumFramesForSuperSorter / superSorterMaxActiveProcessors
- 1
- isSmallWorker;
if (superSorterMaxActiveProcessors <= 0) {
throw new MSQException(
new NotEnoughMemoryFault(
calculateSuggestedMinMemoryFromUsableMemory(
estimateUsableMemory(
numWorkersInJvm,
numProcessingThreadsInJvm,
PROCESSING_MINIMUM_BYTES + BUFFER_BYTES_FOR_ESTIMATION + bundleMemoryForInputChannels
), totalLookupFootprint),
maxMemoryInJvm,
usableMemoryInJvm,
numWorkersInJvm,
numProcessingThreadsInJvm
)
);
}

return new WorkerMemoryParameters(
superSorterMaxActiveProcessors,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,31 @@ public void test_fourWorkersInJvm_twoHundredWorkersInCluster()
Assert.assertEquals(new TooManyWorkersFault(125, 124), e2.getFault());
}

@Test
public void test_oneWorkerInJvm_smallWorkerCapacity()
{
// Supersorter max channels per processer are one less than they are usually to account for extra frames that are required while creating composing output channels
Assert.assertEquals(params(1, 3, 27_604_000, 12_360_000, 9_600_000), create(128_000_000, 1, 1, 1, 0, 0));
Assert.assertEquals(params(1, 1, 17_956_000, 8_040_000, 9_600_000), create(128_000_000, 1, 2, 1, 0, 0));

final MSQException e = Assert.assertThrows(
MSQException.class,
() -> create(1_000_000_000, 1, 32, 1, 0, 0)
);
Assert.assertEquals(new NotEnoughMemoryFault(1_588_044_000, 1_000_000_000, 750_000_000, 1, 32), e.getFault());

final MSQException e2 = Assert.assertThrows(
MSQException.class,
() -> create(128_000_000, 1, 4, 1, 0, 0)
);
Assert.assertEquals(new NotEnoughMemoryFault(580_006_666, 12_8000_000, 96_000_000, 1, 4), e2.getFault());

final MSQFault fault = Assert.assertThrows(MSQException.class, () -> create(1_000_000_000, 2, 32, 1, 0, 0))
.getFault();

Assert.assertEquals(new NotEnoughMemoryFault(2024045333, 1_000_000_000, 750_000_000, 2, 32), fault);
}

@Test
public void test_fourWorkersInJvm_twoHundredWorkersInCluster_hashPartitions()
{
Expand Down

0 comments on commit 387e682

Please sign in to comment.