Skip to content

Commit

Permalink
RunWorkOrder: Account for two simultaneous statistics collectors. (ap…
Browse files Browse the repository at this point in the history
…ache#17216)

* RunWorkOrder: Account for two simultaneous statistics collectors.

As a follow up to apache#17057, divide the amount of partitionStatsMemory
by two, to account for the fact that there are at some times going to
be two copies of the full collector. First there will be one for processors
and one for the accumulated collector. Then, after the processor ones are
GCed, a snapshot of the accumulated collector will be created.

Also includes an optimization to "addAll" for the two KeyCollectors,
for the case where we're adding into an empty collector. This is always
going to happen once per stage due to the "withAccumulation" call.

* Fix missing variable.

* Don't divide by numProcessingThreads twice.

* Fix test.
  • Loading branch information
gianm authored and kfaraz committed Oct 4, 2024
1 parent ad475b0 commit ee77790
Show file tree
Hide file tree
Showing 5 changed files with 33 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1024,7 +1024,8 @@ private ResultAndChannels<?> gatherResultKeyStatistics(final OutputChannels chan
{
final StageDefinition stageDefinition = workOrder.getStageDefinition();
final List<OutputChannel> retVal = new ArrayList<>();
final List<KeyStatisticsCollectionProcessor> processors = new ArrayList<>();
final int numOutputChannels = channels.getAllChannels().size();
final List<KeyStatisticsCollectionProcessor> processors = new ArrayList<>(numOutputChannels);

for (final OutputChannel outputChannel : channels.getAllChannels()) {
final BlockingQueueFrameChannel channel = BlockingQueueFrameChannel.minimal();
Expand All @@ -1037,7 +1038,9 @@ private ResultAndChannels<?> gatherResultKeyStatistics(final OutputChannels chan
stageDefinition.getFrameReader(),
stageDefinition.getClusterBy(),
stageDefinition.createResultKeyStatisticsCollector(
frameContext.memoryParameters().getPartitionStatisticsMaxRetainedBytes()
// Divide by two: half for the per-processor collectors together, half for the combined collector.
// Then divide by numOutputChannels: one portion per processor.
frameContext.memoryParameters().getPartitionStatisticsMaxRetainedBytes() / 2 / numOutputChannels
)
)
);
Expand All @@ -1049,7 +1052,9 @@ private ResultAndChannels<?> gatherResultKeyStatistics(final OutputChannels chan
ProcessorManagers.of(processors)
.withAccumulation(
stageDefinition.createResultKeyStatisticsCollector(
frameContext.memoryParameters().getPartitionStatisticsMaxRetainedBytes()
// Divide by two: half for the per-processor collectors, half for the
// combined collector.
frameContext.memoryParameters().getPartitionStatisticsMaxRetainedBytes() / 2
),
ClusterByStatisticsCollector::addAll
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@ public static WorkerMemoryParameters createInstance(
frameSize,
superSorterConcurrentProcessors,
superSorterMaxChannelsPerMerger,
Math.min(Integer.MAX_VALUE, partitionStatsMemory / numProcessingThreads),
partitionStatsMemory,
hasBroadcastInputs ? computeBroadcastBufferMemory(bundleMemory) : 0
);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,10 +146,13 @@ public void addAll(DistinctKeyCollector other)

if (retainedKeys.isEmpty()) {
this.spaceReductionFactor = other.spaceReductionFactor;
}

for (final Object2LongMap.Entry<RowKey> otherEntry : other.retainedKeys.object2LongEntrySet()) {
add(otherEntry.getKey(), otherEntry.getLongValue());
this.retainedKeys.putAll(other.retainedKeys);
this.maxBytes = other.maxBytes;
this.totalWeightUnadjusted = other.totalWeightUnadjusted;
} else {
for (final Object2LongMap.Entry<RowKey> otherEntry : other.retainedKeys.object2LongEntrySet()) {
add(otherEntry.getKey(), otherEntry.getLongValue());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@ public void addAll(QuantilesSketchKeyCollector other)
double otherBytesCount = other.averageKeyLength * other.getSketch().getN();
averageKeyLength = ((sketchBytesCount + otherBytesCount) / (sketch.getN() + other.sketch.getN()));

union.union(sketch);
if (!sketch.isEmpty()) {
union.union(sketch);
}
union.union(other.sketch);
sketch = union.getResultAndReset();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ public void test_1WorkerInJvm_alone_4Threads()
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(892_000_000, frameSize, 4, 199, 22_300_000, 0),
new WorkerMemoryParameters(892_000_000, frameSize, 4, 199, 89_200_000, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 1, 1)
);
}
Expand All @@ -111,7 +111,7 @@ public void test_1WorkerInJvm_alone_withBroadcast_4Threads()
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(592_000_000, frameSize, 4, 132, 14_800_000, 200_000_000),
new WorkerMemoryParameters(592_000_000, frameSize, 4, 132, 59_200_000, 200_000_000),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 1, 1)
);
}
Expand Down Expand Up @@ -145,7 +145,7 @@ public void test_1WorkerInJvm_alone_2ConcurrentStages_4Threads()
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(392_000_000, frameSize, 4, 87, 9_800_000, 0),
new WorkerMemoryParameters(392_000_000, frameSize, 4, 87, 39_200_000, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 2, 1)
);
}
Expand All @@ -162,7 +162,7 @@ public void test_1WorkerInJvm_alone_2ConcurrentStages_4Threads_highHeap()
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(2_392_000_000L, frameSize, 4, 537, 59_800_000, 0),
new WorkerMemoryParameters(2_392_000_000L, frameSize, 4, 537, 239_200_000, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 2, 1)
);
}
Expand All @@ -179,7 +179,7 @@ public void test_1WorkerInJvm_alone_32Threads()
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(136_000_000, frameSize, 32, 2, 425_000, 0),
new WorkerMemoryParameters(136_000_000, frameSize, 32, 2, 13_600_000, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 1, 1)
);
}
Expand All @@ -196,7 +196,7 @@ public void test_1WorkerInJvm_alone_33Threads()
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(109_000_000, frameSize, 32, 2, 330_303, 0),
new WorkerMemoryParameters(109_000_000, frameSize, 32, 2, 10_900_000, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 1, 1)
);
}
Expand Down Expand Up @@ -276,7 +276,7 @@ public void test_1WorkerInJvm_alone_40Threads_memoryFromError()
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(13_000_000, frameSize, 1, 2, 250_000, 0),
new WorkerMemoryParameters(13_000_000, frameSize, 1, 2, 10_000_000, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 1, 1)
);
}
Expand Down Expand Up @@ -325,7 +325,7 @@ public void test_1WorkerInJvm_alone_40Threads_2ConcurrentStages_memoryFromError(
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(13_000_000, frameSize, 1, 2, 250_000, 0),
new WorkerMemoryParameters(13_000_000, frameSize, 1, 2, 10_000_000, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 2, 1)
);
}
Expand All @@ -342,7 +342,7 @@ public void test_1WorkerInJvm_200WorkersInCluster_4Threads()
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(1_096_000_000, frameSize, 4, 245, 27_400_000, 0),
new WorkerMemoryParameters(1_096_000_000, frameSize, 4, 245, 109_600_000, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 1, 1)
);
}
Expand All @@ -359,7 +359,7 @@ public void test_1WorkerInJvm_200WorkersInCluster_4Threads_2OutputPartitions()
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(1_548_000_000, frameSize, 4, 347, 38_700_000, 0),
new WorkerMemoryParameters(1_548_000_000, frameSize, 4, 347, 154_800_000, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 1, 1)
);
}
Expand All @@ -376,7 +376,7 @@ public void test_1WorkerInJvm_200WorkersInCluster_2ConcurrentStages_4Threads()
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(96_000_000, frameSize, 4, 20, 2_500_000, 0),
new WorkerMemoryParameters(96_000_000, frameSize, 4, 20, 10_000_000, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 2, 1)
);
}
Expand All @@ -393,7 +393,7 @@ public void test_12WorkersInJvm_200WorkersInCluster_64Threads_4OutputPartitions(
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(1_762_666_666, frameSize, 64, 23, 2_754_166, 0),
new WorkerMemoryParameters(1_762_666_666, frameSize, 64, 23, 176_266_666, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 1, 1)
);
}
Expand All @@ -410,7 +410,7 @@ public void test_12WorkersInJvm_200WorkersInCluster_2ConcurrentStages_64Threads_
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(429_333_333, frameSize, 64, 5, 670_833, 0),
new WorkerMemoryParameters(429_333_333, frameSize, 64, 5, 42_933_333, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 2, 1)
);
}
Expand All @@ -428,7 +428,7 @@ public void test_1WorkerInJvm_MaxWorkersInCluster_2ConcurrentStages_2Threads()
final ShuffleSpec shuffleSpec = makeSortShuffleSpec();

Assert.assertEquals(
new WorkerMemoryParameters(448_000_000, frameSize, 2, 200, 22_400_000, 0),
new WorkerMemoryParameters(448_000_000, frameSize, 2, 200, 44_800_000, 0),
WorkerMemoryParameters.createInstance(memoryIntrospector, frameSize, slices, broadcastInputs, shuffleSpec, 2, 1)
);
}
Expand Down

0 comments on commit ee77790

Please sign in to comment.